From d542c21c86a5b0996bdb3152282bf8554ebea3d2 Mon Sep 17 00:00:00 2001 From: IntegratedQuantum Date: Fri, 9 Sep 2022 10:16:45 +0200 Subject: [PATCH] Finish the ConnectionManager and implement the socket in a cross-platform way. --- build.zig | 4 +- include/cross_platform_udp_socket.h | 11 + lib/cross_platform_udp_socket.c | 91 +++++ src/main.zig | 13 +- src/network.zig | 496 ++++++++++++++-------------- src/settings.zig | 3 +- 6 files changed, 362 insertions(+), 256 deletions(-) create mode 100644 include/cross_platform_udp_socket.h create mode 100644 lib/cross_platform_udp_socket.c diff --git a/build.zig b/build.zig index 30959844..22b4630a 100644 --- a/build.zig +++ b/build.zig @@ -21,6 +21,7 @@ pub fn build(b: *std.build.Builder) void { }, &[_][]const u8{"-gdwarf-4", "-std=c99", "-D_GLFW_WIN32"}); exe.linkSystemLibrary("gdi32"); exe.linkSystemLibrary("opengl32"); + exe.linkSystemLibrary("ws2_32"); } else if(target.getOsTag() == .linux) { // TODO: if(isWayland) { // exe.addCSourceFiles(&[_][]const u8 { @@ -37,9 +38,10 @@ pub fn build(b: *std.build.Builder) void { std.log.err("Unsupported target: {}\n", .{ target.getOsTag() }); } } - exe.addCSourceFiles(&[_][]const u8{"lib/glad.c", "lib/stb_image.c"}, &[_][]const u8{"-gdwarf-4",}); + exe.addCSourceFiles(&[_][]const u8{"lib/glad.c", "lib/stb_image.c", "lib/cross_platform_udp_socket.c"}, &[_][]const u8{"-gdwarf-4",}); exe.setTarget(target); exe.setBuildMode(mode); + //exe.sanitize_thread = true; exe.install(); const run_cmd = exe.run(); diff --git a/include/cross_platform_udp_socket.h b/include/cross_platform_udp_socket.h new file mode 100644 index 00000000..11158733 --- /dev/null +++ b/include/cross_platform_udp_socket.h @@ -0,0 +1,11 @@ +#include + +void startup(); + +int init(unsigned short localPort); +int deinit(int socketID); +int sendTo(int socketID, const char* data, uintptr_t size, uint32_t ip, uint16_t port); +intptr_t receiveFrom(int socketID, char* buffer, uintptr_t size, int timeout, uint32_t* resultIP, uint16_t* resultPort); +uint32_t parseIP(const char* ip); + +int getError(); \ No newline at end of file diff --git a/lib/cross_platform_udp_socket.c b/lib/cross_platform_udp_socket.c new file mode 100644 index 00000000..b1c92618 --- /dev/null +++ b/lib/cross_platform_udp_socket.c @@ -0,0 +1,91 @@ +#ifdef _WIN32 +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0600 +#endif +#include +#include +#else +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +#include + +#include + +int checkError(int in) { + // TODO: Print the error here. + return in; +} + +void startup() { +#ifdef _WIN32 + WSADATA d; + if (WSAStartup(MAKEWORD(2, 2), &d)) { + fprintf(stderr, "Failed to initialize.\n"); + } +#endif +} + +int init(unsigned short localPort) { + int socketID = checkError(socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)); + if(socketID == -1) return -1; + struct sockaddr_in bindingAddr; + bindingAddr.sin_family = AF_INET; + bindingAddr.sin_port = htons(localPort); + bindingAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); + memset(&bindingAddr.sin_zero, 0, 8); + if(checkError(bind(socketID, (const struct sockaddr*)&bindingAddr, sizeof(bindingAddr))) == -1) { + close(socketID); + return -1; + }; + return socketID; +} + +int deinit(int socketID) { +#ifdef _WIN32 + return checkError(closesocket(socketID)); +#else + return checkError(close(socketID)); +#endif +} + +int sendTo(int socketID, const char* data, uintptr_t size, uint32_t ip, uint16_t port) { + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = ip; + memset(&addr.sin_zero, 0, 8); + return checkError(sendto(socketID, data, size, 0, (const struct sockaddr*)&addr, sizeof(addr))); +} + +intptr_t receiveFrom(int socketID, char* buffer, uintptr_t size, int timeout, uint32_t* resultIP, uint16_t* resultPort) { + struct pollfd pfd = {.fd = socketID, .events = POLLIN}; +#ifdef _WIN32 + intptr_t result = checkError(WSAPoll(&pfd, 1, timeout)); +#else + intptr_t result = checkError(poll(&pfd, 1, timeout)); +#endif + if(result <= 0) return result; + struct sockaddr_in address; + uint32_t addrLen = sizeof(address); + result = checkError(recvfrom(socketID, buffer, size, 0, &address, &addrLen)); + + *resultIP = address.sin_addr.s_addr; + *resultPort = ntohs(address.sin_port); + + return result; +} +uint32_t parseIP(const char* ip) { + return inet_addr(ip); +} + +int getError() { + return errno; +} \ No newline at end of file diff --git a/src/main.zig b/src/main.zig index e42bcd8c..1542c45e 100644 --- a/src/main.zig +++ b/src/main.zig @@ -164,9 +164,17 @@ pub fn main() !void { try assets.loadWorldAssets("saves"); - var conn = try network.ConnectionManager.init(12345, true); + std.log.info("{}", .{threadAllocator}); + + network.init(); + + var conn = try network.ConnectionManager.init(12346, true); defer conn.deinit(); + var conn2 = try network.Connection.init(conn, "127.0.0.1"); + defer conn2.deinit(); + std.log.info("{}", .{threadAllocator}); + c.glEnable(c.GL_CULL_FACE); c.glCullFace(c.GL_BACK); c.glEnable(c.GL_BLEND); @@ -196,9 +204,6 @@ pub fn main() !void { graphics.Draw.line(Vec2f{.x = 0, .y = 0}, Vec2f{.x = 1920, .y = 1080}); } } - - var conn2 = try network.Connection.init(conn, "127.0.0.1:12345"); - conn2.deinit(); } test "abc" { diff --git a/src/network.zig b/src/network.zig index a652d074..b1c1c1ac 100644 --- a/src/network.zig +++ b/src/network.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const Allocator = std.mem.Allocator; const main = @import("main.zig"); const game = @import("game.zig"); @@ -6,61 +7,84 @@ const settings = @import("settings.zig"); //TODO: Might want to use SSL or something similar to encode the message -const LinuxSocket = struct { - const c = @cImport({ - @cInclude("sys/socket.h"); - @cInclude("netinet/in.h"); - @cInclude("sys/types.h"); - @cInclude("unistd.h"); - @cInclude("string.h"); - @cInclude("errno.h"); - @cInclude("stdio.h"); - @cInclude("arpa/inet.h"); - }); - +const Socket = struct { + const c = @cImport({@cInclude("cross_platform_udp_socket.h");}); socketID: u31, - fn checkError(comptime msg: []const u8, result: c_int) !u31 { + fn checkError(comptime msg: []const u8, comptime T: type, result: T) !std.meta.Int(.unsigned, @bitSizeOf(T) - 1) { if(result == -1) { - std.log.warn(msg, .{c.__errno_location().*}); + std.log.warn(msg, .{c.getError()}); return error.SocketError; } - return @intCast(u31, result); + return @intCast(std.meta.Int(.unsigned, @bitSizeOf(T) - 1), result); } - fn init(localPort: u16) !LinuxSocket { - var socketID: u31 = undefined; - socketID = try checkError("Socket creation failed with error: {}", c.socket(c.AF_INET, c.SOCK_DGRAM, c.IPPROTO_UDP)); - errdefer _ = checkError("Error while closing socket: {}", c.close(socketID)) catch 0; - var bindingAddr: c.sockaddr_in = undefined; - bindingAddr.sin_family = c.AF_INET; - bindingAddr.sin_port = c.htons(localPort); - bindingAddr.sin_addr.s_addr = c.inet_addr("127.0.0.1"); - bindingAddr.sin_zero = [_]u8{0} ** 8; - _ = try checkError("Socket binding failed with error: {}", c.bind(socketID, @ptrCast(*c.sockaddr, &bindingAddr), @sizeOf(c.sockaddr_in))); // TODO: Use the next higher port, when the port is already in use. - return LinuxSocket{.socketID = socketID}; + fn init(localPort: u16) !Socket { + return Socket{.socketID = try checkError("Socket creation failed with error: {}", c_int, c.init(localPort))}; } - fn deinit(self: LinuxSocket) void { - _ = checkError("Error while closing socket: {}", c.close(self.socketID)) catch 0; + fn deinit(self: Socket) void { + _ = checkError("Error while closing socket: {}", c_int, c.deinit(self.socketID)) catch 0; + } + + fn send(self: Socket, data: []const u8, destination: Address) !void { + _ = try checkError("Error sending data: {}", isize, c.sendTo(self.socketID, data.ptr, data.len, destination.ip, destination.port)); + } + + fn receive(self: Socket, buffer: []u8, timeout: c_int, resultAddress: *Address) ![]u8 { + var length = try checkError("Receive failed: {}", isize, c.receiveFrom(self.socketID, buffer.ptr, buffer.len, timeout, &resultAddress.ip, &resultAddress.port)); + if(length == 0) return error.Timeout; + return buffer[0..length]; + } + + fn parseIP(ip: [:0]const u8) u32 { + return c.parseIP(ip.ptr); } }; -pub const Address = struct { - ip: []const u8, +pub fn init() void { + Socket.c.startup(); +} + +const Address = struct { + ip: u32, port: u16, }; -pub const ConnectionManager = struct { - socket: LinuxSocket = undefined, - thread: std.Thread = undefined, - online: bool = false, +const Request = struct { + address: Address, + data: []const u8, + requestNotifier: std.Thread.Condition = std.Thread.Condition{}, +}; - pub fn init(localPort: u16, online: bool) !ConnectionManager { - _ = online; //TODO - var result = ConnectionManager{}; - result.socket = try LinuxSocket.init(localPort); - errdefer LinuxSocket.deinit(result.socket); +// private volatile boolean running = true; +pub const ConnectionManager = struct { + socket: Socket = undefined, + thread: std.Thread = undefined, + externalAddress: ?Address = null, + online: bool = false, + running: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true), + + connections: std.ArrayList(*Connection) = undefined, + requests: std.ArrayList(*Request) = undefined, + + gpa: std.heap.GeneralPurposeAllocator(.{}), + allocator: std.mem.Allocator = undefined, + + mutex: std.Thread.Mutex = std.Thread.Mutex{}, + + receiveBuffer: [Connection.maxPacketSize]u8 = undefined, + + pub fn init(localPort: u16, online: bool) !*ConnectionManager { + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + var result: *ConnectionManager = try gpa.allocator().create(ConnectionManager); + result.* = ConnectionManager {.gpa = gpa}; + result.allocator = result.gpa.allocator(); + result.connections = std.ArrayList(*Connection).init(result.allocator); + result.requests = std.ArrayList(*Request).init(result.allocator); + + result.socket = try Socket.init(localPort); + errdefer Socket.deinit(result.socket); result.thread = try std.Thread.spawn(.{}, run, .{result}); if(online) { @@ -69,9 +93,25 @@ pub const ConnectionManager = struct { return result; } - pub fn deinit(self: ConnectionManager) void { - LinuxSocket.deinit(self.socket); + pub fn deinit(self: *ConnectionManager) void { + self.running.store(false, .Monotonic); self.thread.join(); + Socket.deinit(self.socket); + + for(self.connections.items) |conn| { + conn.disconnect(); + } + self.connections.deinit(); + for(self.requests.items) |request| { + request.requestNotifier.signal(); + } + self.requests.deinit(); + + var gpa = self.gpa; + gpa.allocator().destroy(self); + if(gpa.deinit()) { + @panic("Memory leak in connection."); + } } pub fn makeOnline(self: *ConnectionManager) void { @@ -95,180 +135,139 @@ pub const ConnectionManager = struct { } } - pub fn run(self: ConnectionManager) void { + pub fn send(self: *ConnectionManager, data: []const u8, target: Address) !void { + try self.socket.send(data, target); + } + + pub fn sendRequest(self: *ConnectionManager, allocator: Allocator, data: []const u8, target: Address, timeout_ns: u64) ?[]const u8 { + self.send(data, target); + var request = Request{.address = target, .data = data}; + { + self.mutex.lock(); + defer self.mutex.unlock(); + self.requests.append(&request); + + request.requestNotifier.timedWait(self.mutex, timeout_ns) catch {}; + + for(self.requests.items) |req, i| { + if(req == request) { + _ = self.requests.swapRemove(i); + break; + } + } + } + + // The request data gets modified when a result was received. + if(request.data == data) { + return null; + } else { + if(allocator == self.allocator) { + return request.data; + } else { + var result = allocator.dupe(request.data); + self.allocator.free(request.data); + return result; + } + } + } + + pub fn addConnection(self: *ConnectionManager, conn: *Connection) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + + try self.connections.append(conn); + } + + pub fn removeConnection(self: *ConnectionManager, conn: *Connection) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + for(self.connections.items) |other, i| { + if(other == conn) { + _ = self.connections.swapRemove(i); + break; + } + } + } + + fn onReceive(self: *ConnectionManager, data: []const u8, source: Address) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + + for(self.connections.items) |conn| { + if(conn.remoteAddress.ip == source.ip) { + if(conn.bruteforcingPort) { + conn.remoteAddress.port = source.port; + conn.bruteforcingPort = false; + } + if(conn.remoteAddress.port == source.port) { + try conn.receive(data); + return; + } + } + } + // Check if it's part of an active request: + for(self.requests.items) |request| { + if(request.address.ip == source.ip and request.address.port == source.port) { + request.data = try self.allocator.dupe(u8, data); + request.requestNotifier.signal(); + return; + } + } + if(self.externalAddress != null and source.ip == self.externalAddress.?.ip and source.port == self.externalAddress.?.port) return; + // TODO: Reduce the number of false alarms in the short period after a disconnect. + std.log.warn("Unknown connection from address: {}", .{source}); + std.log.debug("Message: {any}", .{data}); + } + + pub fn run(self: *ConnectionManager) !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; main.threadAllocator = gpa.allocator(); defer if(gpa.deinit()) { @panic("Memory leak"); }; - _ = self; // TODO - } + var lastTime = std.time.milliTimestamp(); + while(self.running.load(.Monotonic)) { + var source: Address = undefined; + if(self.socket.receive(&self.receiveBuffer, 100, &source)) |data| { + try self.onReceive(data, source); + } else |err| { + if(err == error.Timeout) { + // No message within the last ~100 ms. + } else { + return err; // TODO: Shutdown the game normally. + } + } - pub fn send(self: ConnectionManager, data: []const u8, target: Address) void { - // TODO - _ = self; - _ = data; - _ = target; + // Send a keep-alive packet roughly every 100 ms: + if(std.time.milliTimestamp() -% lastTime > 100) { + lastTime = std.time.milliTimestamp(); + var i: u32 = 0; + self.mutex.lock(); + defer self.mutex.unlock(); + while(i < self.connections.items.len) { + var conn = self.connections.items[i]; + if(lastTime -% conn.lastConnection > settings.connectionTimeout and conn.isConnected()) { + std.log.info("timeout", .{}); + // Timeout a connection if it was connect at some point. New connections are not timed out because that could annoy players(having to restart the connection several times). + self.mutex.unlock(); + conn.disconnect(); + self.mutex.lock(); + } else { + try conn.sendKeepAlive(); + i += 1; + } + } + if(self.connections.items.len == 0 and self.externalAddress != null) { + // Send a message to external ip, to keep the port open: + var data: [0]u8 = undefined; + try self.send(&data, self.externalAddress.?); + } + } + } } }; -// sockID = c.socket(c.AF_INET, c.SOCK_DGRAM, c.IPPROTO_UDP); -// defer _ = c.close(sockID); -// _ = c.memset(&otherAddr, 0, @sizeOf(c.sockaddr_in)); -// otherAddr.sin_family = c.AF_INET; -// otherAddr.sin_port = c.htons(40001); -// otherAddr.sin_addr.s_addr = c.inet_addr("???.???.???.???"); -// var myAddr: c.sockaddr_in = undefined; -// _ = c.memset(&myAddr, 0, @sizeOf(c.sockaddr_in)); -// myAddr.sin_family = c.AF_INET; -// myAddr.sin_port = c.htons(40001); -// myAddr.sin_addr.s_addr = c.inet_addr("192.168.178.60"); -// -// _ = errorCheck(c.bind(sockID, @ptrCast(*c.sockaddr, &myAddr), @sizeOf(c.sockaddr_in))); -// -// _ = std.Thread.spawn(.{}, keepAlive, .{}) catch null; -//public final class UDPConnectionManager extends Thread { -// private final DatagramPacket receivedPacket; -// public final ArrayList connections = new ArrayList<>(); -// private final ArrayList requests = new ArrayList<>(); -// private volatile boolean running = true; -// public String externalIPPort = null; -// private InetAddress externalAddress = null; -// private int externalPort = 0; -// -// public void send(DatagramPacket packet) { -// try { -// socket.send(packet); -// } catch(IOException e) { -// Logger.error(e); -// } -// } -// -// public byte[] sendRequest(DatagramPacket packet, long timeout) { -// send(packet); -// byte[] request = packet.getData(); -// synchronized(requests) { -// requests.add(packet); -// } -// synchronized(packet) { -// try { -// packet.wait(timeout); -// } catch(InterruptedException e) {} -// } -// synchronized(requests) { -// requests.remove(packet); -// } -// if(packet.getData() == request) { -// return null; -// } else { -// return packet.getData(); -// } -// } -// -// public void addConnection(UDPConnection connection) { -// synchronized(connections) { -// connections.add(connection); -// } -// } -// -// public void removeConnection(UDPConnection connection) { -// synchronized(connections) { -// connections.remove(connection); -// } -// } -// -// public void cleanup() { -// while(!connections.isEmpty()) { -// connections.get(0).disconnect(); -// } -// running = false; -// if(Thread.currentThread() != this) { -// interrupt(); -// try { -// join(); -// } catch(InterruptedException e) { -// Logger.error(e); -// } -// } -// socket.close(); -// } -// -// private void onReceive() { -// byte[] data = receivedPacket.getData(); -// int len = receivedPacket.getLength(); -// InetAddress addr = receivedPacket.getAddress(); -// int port = receivedPacket.getPort(); -// for(UDPConnection connection : connections) { -// if(connection.remoteAddress.equals(addr)) { -// if(connection.bruteforcingPort) { // brute-forcing the port was successful. -// connection.remotePort = port; -// connection.bruteforcingPort = false; -// } -// if(connection.remotePort == port) { -// connection.receive(data, len); -// return; -// } -// } -// } -// // Check if it's part of an active request: -// synchronized(requests) { -// for(DatagramPacket packet : requests) { -// if(packet.getAddress().equals(addr) && packet.getPort() == port) { -// packet.setData(Arrays.copyOf(data, len)); -// synchronized(packet) { -// packet.notify(); -// } -// return; -// } -// } -// } -// if(addr.equals(externalAddress) && port == externalPort) return; -// if(addr.toString().contains("127.0.0.1")) return; -// Logger.warning("Unknown connection from address: " + addr+":"+port); -// Logger.debug("Message: "+Arrays.toString(Arrays.copyOf(data, len))); -// } -// -// @Override -// public void run() { -// assert Thread.currentThread() == this : "UDPConnectionManager.run() shouldn't be called by anyone."; -// try { -// socket.setSoTimeout(100); -// long lastTime = System.currentTimeMillis(); -// while (running) { -// try { -// socket.receive(receivedPacket); -// onReceive(); -// } catch(SocketTimeoutException e) { -// // No message within the last ~100 ms. -// } -// -// // Send a keep-alive packet roughly every 100 ms: -// if(System.currentTimeMillis() - lastTime > 100 && running) { -// lastTime = System.currentTimeMillis(); -// for(UDPConnection connection : connections.toArray(new UDPConnection[0])) { -// if(lastTime - connection.lastConnection > CONNECTION_TIMEOUT && connection.isConnected()) { -// Logger.info("timeout"); -// // Timeout a connection if it was connect at some point. New connections are not timed out because that could annoy players(having to restart the connection several times). -// connection.disconnect(); -// } else { -// connection.sendKeepAlive(); -// } -// } -// if(connections.isEmpty() && externalAddress != null) { -// // Send a message to external ip, to keep the port open: -// DatagramPacket packet = new DatagramPacket(new byte[0], 0); -// packet.setAddress(externalAddress); -// packet.setPort(externalPort); -// packet.setLength(0); -// send(packet); -// } -// } -// } -// } catch (Exception e) { -// Logger.crash(e); -// } -// } -//} const UnconfirmedPacket = struct { data: []const u8, @@ -292,7 +291,7 @@ pub const Connection = struct { var packetsSent: u32 = 0; var packetsResent: u32 = 0; - manager: ConnectionManager, + manager: *ConnectionManager, gpa: std.heap.GeneralPurposeAllocator(.{}), allocator: std.mem.Allocator, @@ -304,8 +303,8 @@ pub const Connection = struct { streamBuffer: [maxImportantPacketSize]u8 = undefined, streamPosition: u32 = importantHeaderSize, messageID: u32 = 0, - unconfirmedPackets: std.ArrayList(UnconfirmedPacket), - receivedPackets: [3]std.ArrayList(u32), + unconfirmedPackets: std.ArrayList(UnconfirmedPacket) = undefined, + receivedPackets: [3]std.ArrayList(u32) = undefined, lastReceivedPackets: [65536]?[]const u8 = undefined, lastIndex: u32 = 0, @@ -317,11 +316,11 @@ pub const Connection = struct { disconnected: bool = false, handShakeComplete: bool = false, - lastConnection: i64 = 0, + lastConnection: i64, mutex: std.Thread.Mutex = std.Thread.Mutex{}, - pub fn init(manager: ConnectionManager, ipPort: []const u8) !*Connection { + pub fn init(manager: *ConnectionManager, ipPort: []const u8) !*Connection { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; var result: *Connection = try gpa.allocator().create(Connection); result.* = Connection { @@ -329,16 +328,19 @@ pub const Connection = struct { .gpa = gpa, .allocator = undefined, .remoteAddress = undefined, - .unconfirmedPackets = std.ArrayList(UnconfirmedPacket).init(gpa.allocator()), - .receivedPackets = [3]std.ArrayList(u32){ - std.ArrayList(u32).init(gpa.allocator()), - std.ArrayList(u32).init(gpa.allocator()), - std.ArrayList(u32).init(gpa.allocator()), - }, + .lastConnection = std.time.milliTimestamp(), }; result.allocator = result.gpa.allocator(); // The right reference(the one that isn't on the stack) needs to be used passed! + result.unconfirmedPackets = std.ArrayList(UnconfirmedPacket).init(result.allocator); + result.receivedPackets = [3]std.ArrayList(u32){ + std.ArrayList(u32).init(result.allocator), + std.ArrayList(u32).init(result.allocator), + std.ArrayList(u32).init(result.allocator), + }; var splitter = std.mem.split(u8, ipPort, ":"); - result.remoteAddress.ip = try result.allocator.dupe(u8, splitter.first()); + var nullTerminatedIP = try main.threadAllocator.dupeZ(u8, splitter.first()); + defer main.threadAllocator.free(nullTerminatedIP); + result.remoteAddress.ip = Socket.parseIP(nullTerminatedIP); var port = splitter.rest(); if(port.len != 0 and port[0] == '?') { result.bruteforcingPort = true; @@ -349,16 +351,16 @@ pub const Connection = struct { break :blk settings.defaultPort; }; - // TODO: manager.addConnection(this); + try result.manager.addConnection(result); return result; } pub fn deinit(self: *Connection) void { + self.disconnect(); self.unconfirmedPackets.deinit(); self.receivedPackets[0].deinit(); self.receivedPackets[1].deinit(); self.receivedPackets[2].deinit(); - self.allocator.free(self.remoteAddress.ip); var gpa = self.gpa; gpa.allocator().destroy(self); if(gpa.deinit()) { @@ -367,9 +369,6 @@ pub const Connection = struct { } fn flush(self: *Connection) !void { - self.mutex.lock(); - defer self.mutex.unlock(); - if(self.streamPosition == importantHeaderSize) return; // Don't send empty packets. // Fill the header: self.streamBuffer[0] = Protocol.important; @@ -384,7 +383,7 @@ pub const Connection = struct { }; try self.unconfirmedPackets.append(packet); packetsSent += 1; - self.manager.send(packet.data, self.remoteAddress); + try self.manager.send(packet.data, self.remoteAddress); self.streamPosition = importantHeaderSize; } @@ -455,7 +454,7 @@ pub const Connection = struct { } } - fn sendKeepAlive(self: *Connection) void { + fn sendKeepAlive(self: *Connection) !void { self.mutex.lock(); defer self.mutex.unlock(); @@ -468,22 +467,22 @@ pub const Connection = struct { for(list.items) |packetID| { var leftRegion: ?u32 = null; var rightRegion: ?u32 = null; - for(runLengthEncodingStarts) |start, reg| { + for(runLengthEncodingStarts.items) |start, reg| { var diff = packetID -% start; if(diff < runLengthEncodingLengths.items[reg]) continue; if(diff == runLengthEncodingLengths.items[reg]) { - leftRegion = reg; + leftRegion = @intCast(u32, reg); } if(diff == std.math.maxInt(u32)) { - rightRegion == reg; + rightRegion = @intCast(u32, reg); } } if(leftRegion) |left| { if(rightRegion) |right| { // Needs to combine the regions: runLengthEncodingLengths.items[left] += runLengthEncodingLengths.items[right] + 1; - runLengthEncodingStarts.swapRemove(right); - runLengthEncodingLengths.swapRemove(right); + _ = runLengthEncodingStarts.swapRemove(right); + _ = runLengthEncodingLengths.swapRemove(right); } else { runLengthEncodingLengths.items[left] += 1; } @@ -511,34 +510,34 @@ pub const Connection = struct { std.mem.writeIntBig(u32, output[1..5], self.lastKeepAliveSent); self.lastKeepAliveSent += 1; std.mem.writeIntBig(u32, output[5..9], self.otherKeepAliveReceived); - var remaining: []const u8 = output[9..]; - for(runLengthEncodingStarts) |_, i| { - std.mem.writeIntBig(u32, remaining[0..4], self.runLengthEncodingStarts.items[i]); - std.mem.writeIntBig(u32, remaining[4..8], self.runLengthEncodingLengths.items[i]); + var remaining: []u8 = output[9..]; + for(runLengthEncodingStarts.items) |_, i| { + std.mem.writeIntBig(u32, remaining[0..4], runLengthEncodingStarts.items[i]); + std.mem.writeIntBig(u32, remaining[4..8], runLengthEncodingLengths.items[i]); remaining = remaining[8..]; } - self.manager.send(output, self.remoteAddress); + try self.manager.send(output, self.remoteAddress); // Resend packets that didn't receive confirmation within the last 2 keep-alive signals. for(self.unconfirmedPackets.items) |*packet| { if(self.lastKeepAliveReceived - packet.lastKeepAliveSentBefore >= 2) { packetsSent += 1; packetsResent += 1; - self.manager.send(packet.data, self.remoteAddress); + try self.manager.send(packet.data, self.remoteAddress); packet.lastKeepAliveSentBefore = self.lastKeepAliveSent; } } - self.flush(); + try self.flush(); if(self.bruteforcingPort) { // This is called every 100 ms, so if I send 10 requests it shouldn't be too bad. var i: u16 = 0; while(i < 5): (i += 1) { var data = [0]u8{}; if(self.remoteAddress.port +% self.bruteForcedPortRange != 0) { - self.manager.send(data, Address{self.remoteAddress.ip, self.remoteAddress.port +% self.bruteForcedPortRange}); + try self.manager.send(&data, Address{.ip = self.remoteAddress.ip, .port = self.remoteAddress.port +% self.bruteForcedPortRange}); } if(self.remoteAddress.port - self.bruteForcedPortRange != 0) { - self.manager.send(data, Address{self.remoteAddress.ip, self.remoteAddress.port -% self.bruteForcedPortRange}); + try self.manager.send(&data, Address{.ip = self.remoteAddress.ip, .port = self.remoteAddress.port -% self.bruteForcedPortRange}); } self.bruteForcedPortRange +%= 1; } @@ -569,7 +568,7 @@ pub const Connection = struct { // Determine the next packet length: var len: u32 = 0; - var shift: u32 = 0; + var shift: u5 = 0; while(true) { if(newIndex == receivedPacket.len) { newIndex = 0; @@ -578,7 +577,7 @@ pub const Connection = struct { } var nextByte = receivedPacket[newIndex]; newIndex += 1; - len |= (nextByte & 0x7f) << shift; + len |= @intCast(u32, nextByte & 0x7f) << shift; if(nextByte & 0x80 != 0) { shift += 7; } else { @@ -601,7 +600,7 @@ pub const Connection = struct { while(remaining.len != 0) { dataAvailable = @minimum(self.lastReceivedPackets[id & 65535].?.len - newIndex, remaining.len); std.mem.copy(u8, remaining, self.lastReceivedPackets[id & 65535].?[newIndex..dataAvailable]); - newIndex += dataAvailable; + newIndex += @intCast(u32, dataAvailable); remaining = remaining[dataAvailable..]; if(newIndex == self.lastReceivedPackets[id & 65535].?.len) { id += 1; @@ -619,10 +618,7 @@ pub const Connection = struct { } } - pub fn receive(self: *Connection, data: []const u8) void { - self.mutex.lock(); - defer self.mutex.unlock(); - + pub fn receive(self: *Connection, data: []const u8) !void { const protocol = data[0]; // TODO: //if(!self.handShakeComplete and protocol != Protocols.HANDSHAKE.id and protocol != Protocol.KEEP_ALIVE and protocol != Protocol.important) { @@ -660,9 +656,9 @@ pub const Connection = struct { if(id < self.lastIncompletePacket or self.lastReceivedPackets[id & 65535] != null) { return; // Already received the package in the past. } - self.lastReceivedPackets[id & 65535] = self.allocator.dupe(data[importantHeaderSize..]); + self.lastReceivedPackets[id & 65535] = try self.allocator.dupe(u8, data[importantHeaderSize..]); // Check if a message got completed: - self.collectPackets(); + try self.collectPackets(); } else if(protocol == Protocol.keepAlive) { self.receiveKeepAlive(data[1..]); } else { @@ -680,7 +676,7 @@ pub const Connection = struct { // try {Thread.sleep(10);} catch(Exception e) {} // Protocols.DISCONNECT.disconnect(this); self.disconnected = true; - // TODO: manager.removeConnection(self); - std.log.info("Disconnected"); + self.manager.removeConnection(self); + std.log.info("Disconnected", .{}); } }; \ No newline at end of file diff --git a/src/settings.zig b/src/settings.zig index a97dce3f..7610f9ba 100644 --- a/src/settings.zig +++ b/src/settings.zig @@ -1,3 +1,4 @@ -pub const defaultPort: u16 = 47649; \ No newline at end of file +pub const defaultPort: u16 = 47649; +pub const connectionTimeout = 60000; \ No newline at end of file