diff --git a/src/network.zig b/src/network.zig index f4480adf..55a6b188 100644 --- a/src/network.zig +++ b/src/network.zig @@ -1,6 +1,7 @@ const std = @import("std"); const Allocator = std.mem.Allocator; +const chunk = @import("chunk.zig"); const main = @import("main.zig"); const game = @import("game.zig"); const settings = @import("settings.zig"); @@ -308,6 +309,7 @@ const STUN = struct { pub const ConnectionManager = struct { socket: Socket = undefined, thread: std.Thread = undefined, + threadId: std.Thread.Id = undefined, externalAddress: ?Address = null, online: bool = false, running: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true), @@ -346,7 +348,7 @@ pub const ConnectionManager = struct { Socket.deinit(self.socket); for(self.connections.items) |conn| { - conn.disconnect(); + conn.disconnect() catch |err| {std.log.warn("Error while disconnecting: {s}", .{@errorName(err)});}; } self.connections.deinit(); for(self.requests.items) |request| { @@ -424,8 +426,8 @@ pub const ConnectionManager = struct { } fn onReceive(self: *ConnectionManager, data: []const u8, source: Address) !void { + std.debug.assert(self.threadId == std.Thread.getCurrentId()); self.mutex.lock(); - defer self.mutex.unlock(); for(self.connections.items) |conn| { if(conn.remoteAddress.ip == source.ip) { @@ -434,11 +436,13 @@ pub const ConnectionManager = struct { conn.bruteforcingPort = false; } if(conn.remoteAddress.port == source.port) { + self.mutex.unlock(); try conn.receive(data); return; } } } + defer self.mutex.unlock(); // Check if it's part of an active request: for(self.requests.items) |request| { if(request.address.ip == source.ip and request.address.port == source.port) { @@ -454,6 +458,7 @@ pub const ConnectionManager = struct { } pub fn run(self: *ConnectionManager) !void { + self.threadId = std.Thread.getCurrentId(); var gpa = std.heap.GeneralPurposeAllocator(.{}){}; main.threadAllocator = gpa.allocator(); defer if(gpa.deinit()) { @@ -485,7 +490,7 @@ pub const ConnectionManager = struct { std.log.info("timeout", .{}); // Timeout a connection if it was connect at some point. New connections are not timed out because that could annoy players(having to restart the connection several times). self.mutex.unlock(); - conn.disconnect(); + try conn.disconnect(); self.mutex.lock(); } else { try conn.sendKeepAlive(); @@ -517,10 +522,13 @@ fn addProtocol(comptime comptimeList: *[256]?*const fn(*Connection, []const u8) return prot; } +pub var bytesReceived: [256]usize = [_]usize {0} ** 256; +pub var packetsReceived: [256]usize = [_]usize {0} ** 256; pub const Protocols = blk: { comptime var comptimeList = [_]?*const fn(*Connection, []const u8) anyerror!void{null} ** 256; const Protocols_struct = struct { list: [256]?*const fn(*Connection, []const u8) anyerror!void, + keepAlive: u8 = 0, important: u8 = 0xff, handShake: type = addProtocol(&comptimeList, struct { @@ -627,15 +635,52 @@ pub const Protocols = blk: { conn.mutex.unlock(); } }), + chunkRequest: type = addProtocol(&comptimeList, struct { + const id: u8 = 2; + fn receive(conn: *Connection, data: []const u8) !void { + var remaining = data[0..]; + while(remaining.len >= 16) { + const request = chunk.ChunkPosition{ + .wx = std.mem.readIntBig(chunk.ChunkCoordinate, data[0..4]), + .wy = std.mem.readIntBig(chunk.ChunkCoordinate, data[4..8]), + .wz = std.mem.readIntBig(chunk.ChunkCoordinate, data[8..12]), + .voxelSize = std.mem.readIntBig(chunk.ChunkCoordinate, data[12..16]), + }; + _ = request; + _ = conn; + // TODO: Server.world.queueChunk(request, (User)conn); + remaining = remaining[16..]; + } + } + pub fn sendRequest(conn: *Connection, requests: []chunk.ChunkPosition) !void { + if(requests.len == 0) return; + var data = main.threadAllocator.alloc(16*requests.len); + defer main.threadAllocator.free(data); + var remaining = data; + for(requests) |req| { + std.mem.writeIntBig(chunk.ChunkCoordinate, data[0..4], req.wx); + std.mem.writeIntBig(chunk.ChunkCoordinate, data[4..8], req.wy); + std.mem.writeIntBig(chunk.ChunkCoordinate, data[8..12], req.wz); + std.mem.writeIntBig(chunk.ChunkCoordinate, data[12..16], req.voxelSize); + remaining = remaining[16..]; + } + conn.sendImportant(id, data); + } + }), + disconnect: type = addProtocol(&comptimeList, struct { + const id: u8 = 5; + fn receive(conn: *Connection, _: []const u8) !void { + try conn.disconnect(); + } + pub fn disconnect(conn: *Connection) !void { + const noData = [0]u8 {}; + try conn.sendUnimportant(id, &noData); + } + }), }; break :blk Protocols_struct{.list = comptimeList}; }; //public final class Protocols { -// public static final int[] bytesReceived = new int[256]; -// public static final int[] packetsReceived = new int[256]; -// -// public static final byte KEEP_ALIVE = 0; -// public static final byte IMPORTANT_PACKET = (byte)0xff; // public static final HandshakeProtocol HANDSHAKE = new HandshakeProtocol(); // public static final ChunkRequestProtocol CHUNK_REQUEST = new ChunkRequestProtocol(); // public static final ChunkTransmissionProtocol CHUNK_TRANSMISSION = new ChunkTransmissionProtocol(); @@ -733,11 +778,16 @@ pub const Connection = struct { } pub fn deinit(self: *Connection) void { - self.disconnect(); + self.disconnect() catch |err| {std.log.warn("Error while disconnecting: {s}", .{@errorName(err)});}; self.unconfirmedPackets.deinit(); self.receivedPackets[0].deinit(); self.receivedPackets[1].deinit(); self.receivedPackets[2].deinit(); + for(self.lastReceivedPackets) |nullablePacket| { + if(nullablePacket) |packet| { + self.allocator.free(packet); + } + } var gpa = self.gpa; gpa.allocator().destroy(self); if(gpa.deinit()) { @@ -807,10 +857,11 @@ pub const Connection = struct { defer main.threadAllocator.free(fullData); fullData[0] = id; std.mem.copy(u8, fullData[1..], data); - self.manager.send(fullData, self.remoteAddress); + try self.manager.send(fullData, self.remoteAddress); } fn receiveKeepAlive(self: *Connection, data: []const u8) void { + std.debug.assert(self.manager.threadId == std.Thread.getCurrentId()); self.mutex.lock(); defer self.mutex.unlock(); @@ -835,6 +886,7 @@ pub const Connection = struct { } fn sendKeepAlive(self: *Connection) !void { + std.debug.assert(self.manager.threadId == std.Thread.getCurrentId()); self.mutex.lock(); defer self.mutex.unlock(); @@ -900,7 +952,7 @@ pub const Connection = struct { // Resend packets that didn't receive confirmation within the last 2 keep-alive signals. for(self.unconfirmedPackets.items) |*packet| { - if(self.lastKeepAliveReceived -% packet.lastKeepAliveSentBefore >= 2) { + if(self.lastKeepAliveReceived -% @as(i33, packet.lastKeepAliveSentBefore) >= 2) { packetsSent += 1; packetsResent += 1; try self.manager.send(packet.data, self.remoteAddress); @@ -932,6 +984,7 @@ pub const Connection = struct { } fn collectPackets(self: *Connection) !void { + std.debug.assert(self.manager.threadId == std.Thread.getCurrentId()); while(true) { var id = self.lastIncompletePacket; var receivedPacket = self.lastReceivedPackets[id & 65535] orelse return; @@ -987,26 +1040,24 @@ pub const Connection = struct { self.lastReceivedPackets[self.lastIncompletePacket & 65535] = null; } self.lastIndex = newIndex; - // TODO: -// Protocols.bytesReceived[protocol & 0xff] += data.length + 1; + bytesReceived[protocol] += data.len + 1 + (7 + std.math.log2_int(usize, 1 + data.len))/7; if(Protocols.list[protocol]) |prot| { try prot(self, data); } else { - std.log.warn("Received unknown protocol width id {}", .{protocol}); + std.log.warn("Received unknown important protocol width id {}", .{protocol}); } } } pub fn receive(self: *Connection, data: []const u8) !void { + std.debug.assert(self.manager.threadId == std.Thread.getCurrentId()); const protocol = data[0]; - // TODO: if(self.handShakeState != Protocols.handShake.stepComplete and protocol != Protocols.handShake.id and protocol != Protocols.keepAlive and protocol != Protocols.important) { return; // Reject all non-handshake packets until the handshake is done. } self.lastConnection = std.time.milliTimestamp(); - // TODO: -// Protocols.bytesReceived[protocol & 0xff] += len + 20 + 8; // Including IP header and udp header -// Protocols.packetsReceived[protocol & 0xff]++; + bytesReceived[protocol] += data.len + 20 + 8; // Including IP header and udp header; + packetsReceived[protocol] += 1; if(protocol == Protocol.important) { var id = std.mem.readIntBig(u32, data[1..5]); if(self.handShakeState == Protocols.handShake.stepComplete and id == 0) { // Got a new "first" packet from client. So the client tries to reconnect, but we still think it's connected. @@ -1027,7 +1078,7 @@ pub const Connection = struct { // Logger.error("Server 'reconnected'? This makes no sense and the game can't handle that."); // } } - if(id - self.lastIncompletePacket >= 65536) { + if(id - @as(i33, self.lastIncompletePacket) >= 65536) { std.log.warn("Many incomplete packages. Cannot process any more packages for now.", .{}); return; } @@ -1041,19 +1092,22 @@ pub const Connection = struct { } else if(protocol == Protocol.keepAlive) { self.receiveKeepAlive(data[1..]); } else { - // TODO: Protocols.list[protocol & 0xff].receive(this, data, 1, len - 1); + if(Protocols.list[protocol]) |prot| { + try prot(self, data); + } else { + std.log.warn("Received unknown protocol width id {}", .{protocol}); + } } } - pub fn disconnect(self: *Connection) void { + pub fn disconnect(self: *Connection) !void { // Send 3 disconnect packages to the other side, just to be sure. // If all of them don't get through then there is probably a network issue anyways which would lead to a timeout. - // TODO: -// Protocols.DISCONNECT.disconnect(this); -// try {Thread.sleep(10);} catch(Exception e) {} -// Protocols.DISCONNECT.disconnect(this); -// try {Thread.sleep(10);} catch(Exception e) {} -// Protocols.DISCONNECT.disconnect(this); + try Protocols.disconnect.disconnect(self); + std.time.sleep(10000000); + try Protocols.disconnect.disconnect(self); + std.time.sleep(10000000); + try Protocols.disconnect.disconnect(self); self.disconnected = true; self.manager.removeConnection(self); std.log.info("Disconnected", .{});