Finish the ConnectionManager and implement the socket in a cross-platform way.

This commit is contained in:
IntegratedQuantum 2022-09-09 10:16:45 +02:00
parent bebc007d74
commit d542c21c86
6 changed files with 362 additions and 256 deletions

View File

@ -21,6 +21,7 @@ pub fn build(b: *std.build.Builder) void {
}, &[_][]const u8{"-gdwarf-4", "-std=c99", "-D_GLFW_WIN32"});
exe.linkSystemLibrary("gdi32");
exe.linkSystemLibrary("opengl32");
exe.linkSystemLibrary("ws2_32");
} else if(target.getOsTag() == .linux) {
// TODO: if(isWayland) {
// exe.addCSourceFiles(&[_][]const u8 {
@ -37,9 +38,10 @@ pub fn build(b: *std.build.Builder) void {
std.log.err("Unsupported target: {}\n", .{ target.getOsTag() });
}
}
exe.addCSourceFiles(&[_][]const u8{"lib/glad.c", "lib/stb_image.c"}, &[_][]const u8{"-gdwarf-4",});
exe.addCSourceFiles(&[_][]const u8{"lib/glad.c", "lib/stb_image.c", "lib/cross_platform_udp_socket.c"}, &[_][]const u8{"-gdwarf-4",});
exe.setTarget(target);
exe.setBuildMode(mode);
//exe.sanitize_thread = true;
exe.install();
const run_cmd = exe.run();

View File

@ -0,0 +1,11 @@
#include <stdint.h>
void startup();
int init(unsigned short localPort);
int deinit(int socketID);
int sendTo(int socketID, const char* data, uintptr_t size, uint32_t ip, uint16_t port);
intptr_t receiveFrom(int socketID, char* buffer, uintptr_t size, int timeout, uint32_t* resultIP, uint16_t* resultPort);
uint32_t parseIP(const char* ip);
int getError();

View File

@ -0,0 +1,91 @@
#ifdef _WIN32
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <poll.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#endif
#include <stdio.h>
#include <cross_platform_udp_socket.h>
int checkError(int in) {
// TODO: Print the error here.
return in;
}
void startup() {
#ifdef _WIN32
WSADATA d;
if (WSAStartup(MAKEWORD(2, 2), &d)) {
fprintf(stderr, "Failed to initialize.\n");
}
#endif
}
int init(unsigned short localPort) {
int socketID = checkError(socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP));
if(socketID == -1) return -1;
struct sockaddr_in bindingAddr;
bindingAddr.sin_family = AF_INET;
bindingAddr.sin_port = htons(localPort);
bindingAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
memset(&bindingAddr.sin_zero, 0, 8);
if(checkError(bind(socketID, (const struct sockaddr*)&bindingAddr, sizeof(bindingAddr))) == -1) {
close(socketID);
return -1;
};
return socketID;
}
int deinit(int socketID) {
#ifdef _WIN32
return checkError(closesocket(socketID));
#else
return checkError(close(socketID));
#endif
}
int sendTo(int socketID, const char* data, uintptr_t size, uint32_t ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = ip;
memset(&addr.sin_zero, 0, 8);
return checkError(sendto(socketID, data, size, 0, (const struct sockaddr*)&addr, sizeof(addr)));
}
intptr_t receiveFrom(int socketID, char* buffer, uintptr_t size, int timeout, uint32_t* resultIP, uint16_t* resultPort) {
struct pollfd pfd = {.fd = socketID, .events = POLLIN};
#ifdef _WIN32
intptr_t result = checkError(WSAPoll(&pfd, 1, timeout));
#else
intptr_t result = checkError(poll(&pfd, 1, timeout));
#endif
if(result <= 0) return result;
struct sockaddr_in address;
uint32_t addrLen = sizeof(address);
result = checkError(recvfrom(socketID, buffer, size, 0, &address, &addrLen));
*resultIP = address.sin_addr.s_addr;
*resultPort = ntohs(address.sin_port);
return result;
}
uint32_t parseIP(const char* ip) {
return inet_addr(ip);
}
int getError() {
return errno;
}

View File

@ -164,9 +164,17 @@ pub fn main() !void {
try assets.loadWorldAssets("saves");
var conn = try network.ConnectionManager.init(12345, true);
std.log.info("{}", .{threadAllocator});
network.init();
var conn = try network.ConnectionManager.init(12346, true);
defer conn.deinit();
var conn2 = try network.Connection.init(conn, "127.0.0.1");
defer conn2.deinit();
std.log.info("{}", .{threadAllocator});
c.glEnable(c.GL_CULL_FACE);
c.glCullFace(c.GL_BACK);
c.glEnable(c.GL_BLEND);
@ -196,9 +204,6 @@ pub fn main() !void {
graphics.Draw.line(Vec2f{.x = 0, .y = 0}, Vec2f{.x = 1920, .y = 1080});
}
}
var conn2 = try network.Connection.init(conn, "127.0.0.1:12345");
conn2.deinit();
}
test "abc" {

View File

@ -1,4 +1,5 @@
const std = @import("std");
const Allocator = std.mem.Allocator;
const main = @import("main.zig");
const game = @import("game.zig");
@ -6,61 +7,84 @@ const settings = @import("settings.zig");
//TODO: Might want to use SSL or something similar to encode the message
const LinuxSocket = struct {
const c = @cImport({
@cInclude("sys/socket.h");
@cInclude("netinet/in.h");
@cInclude("sys/types.h");
@cInclude("unistd.h");
@cInclude("string.h");
@cInclude("errno.h");
@cInclude("stdio.h");
@cInclude("arpa/inet.h");
});
const Socket = struct {
const c = @cImport({@cInclude("cross_platform_udp_socket.h");});
socketID: u31,
fn checkError(comptime msg: []const u8, result: c_int) !u31 {
fn checkError(comptime msg: []const u8, comptime T: type, result: T) !std.meta.Int(.unsigned, @bitSizeOf(T) - 1) {
if(result == -1) {
std.log.warn(msg, .{c.__errno_location().*});
std.log.warn(msg, .{c.getError()});
return error.SocketError;
}
return @intCast(u31, result);
return @intCast(std.meta.Int(.unsigned, @bitSizeOf(T) - 1), result);
}
fn init(localPort: u16) !LinuxSocket {
var socketID: u31 = undefined;
socketID = try checkError("Socket creation failed with error: {}", c.socket(c.AF_INET, c.SOCK_DGRAM, c.IPPROTO_UDP));
errdefer _ = checkError("Error while closing socket: {}", c.close(socketID)) catch 0;
var bindingAddr: c.sockaddr_in = undefined;
bindingAddr.sin_family = c.AF_INET;
bindingAddr.sin_port = c.htons(localPort);
bindingAddr.sin_addr.s_addr = c.inet_addr("127.0.0.1");
bindingAddr.sin_zero = [_]u8{0} ** 8;
_ = try checkError("Socket binding failed with error: {}", c.bind(socketID, @ptrCast(*c.sockaddr, &bindingAddr), @sizeOf(c.sockaddr_in))); // TODO: Use the next higher port, when the port is already in use.
return LinuxSocket{.socketID = socketID};
fn init(localPort: u16) !Socket {
return Socket{.socketID = try checkError("Socket creation failed with error: {}", c_int, c.init(localPort))};
}
fn deinit(self: LinuxSocket) void {
_ = checkError("Error while closing socket: {}", c.close(self.socketID)) catch 0;
fn deinit(self: Socket) void {
_ = checkError("Error while closing socket: {}", c_int, c.deinit(self.socketID)) catch 0;
}
fn send(self: Socket, data: []const u8, destination: Address) !void {
_ = try checkError("Error sending data: {}", isize, c.sendTo(self.socketID, data.ptr, data.len, destination.ip, destination.port));
}
fn receive(self: Socket, buffer: []u8, timeout: c_int, resultAddress: *Address) ![]u8 {
var length = try checkError("Receive failed: {}", isize, c.receiveFrom(self.socketID, buffer.ptr, buffer.len, timeout, &resultAddress.ip, &resultAddress.port));
if(length == 0) return error.Timeout;
return buffer[0..length];
}
fn parseIP(ip: [:0]const u8) u32 {
return c.parseIP(ip.ptr);
}
};
pub const Address = struct {
ip: []const u8,
pub fn init() void {
Socket.c.startup();
}
const Address = struct {
ip: u32,
port: u16,
};
pub const ConnectionManager = struct {
socket: LinuxSocket = undefined,
thread: std.Thread = undefined,
online: bool = false,
const Request = struct {
address: Address,
data: []const u8,
requestNotifier: std.Thread.Condition = std.Thread.Condition{},
};
pub fn init(localPort: u16, online: bool) !ConnectionManager {
_ = online; //TODO
var result = ConnectionManager{};
result.socket = try LinuxSocket.init(localPort);
errdefer LinuxSocket.deinit(result.socket);
// private volatile boolean running = true;
pub const ConnectionManager = struct {
socket: Socket = undefined,
thread: std.Thread = undefined,
externalAddress: ?Address = null,
online: bool = false,
running: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),
connections: std.ArrayList(*Connection) = undefined,
requests: std.ArrayList(*Request) = undefined,
gpa: std.heap.GeneralPurposeAllocator(.{}),
allocator: std.mem.Allocator = undefined,
mutex: std.Thread.Mutex = std.Thread.Mutex{},
receiveBuffer: [Connection.maxPacketSize]u8 = undefined,
pub fn init(localPort: u16, online: bool) !*ConnectionManager {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
var result: *ConnectionManager = try gpa.allocator().create(ConnectionManager);
result.* = ConnectionManager {.gpa = gpa};
result.allocator = result.gpa.allocator();
result.connections = std.ArrayList(*Connection).init(result.allocator);
result.requests = std.ArrayList(*Request).init(result.allocator);
result.socket = try Socket.init(localPort);
errdefer Socket.deinit(result.socket);
result.thread = try std.Thread.spawn(.{}, run, .{result});
if(online) {
@ -69,9 +93,25 @@ pub const ConnectionManager = struct {
return result;
}
pub fn deinit(self: ConnectionManager) void {
LinuxSocket.deinit(self.socket);
pub fn deinit(self: *ConnectionManager) void {
self.running.store(false, .Monotonic);
self.thread.join();
Socket.deinit(self.socket);
for(self.connections.items) |conn| {
conn.disconnect();
}
self.connections.deinit();
for(self.requests.items) |request| {
request.requestNotifier.signal();
}
self.requests.deinit();
var gpa = self.gpa;
gpa.allocator().destroy(self);
if(gpa.deinit()) {
@panic("Memory leak in connection.");
}
}
pub fn makeOnline(self: *ConnectionManager) void {
@ -95,180 +135,139 @@ pub const ConnectionManager = struct {
}
}
pub fn run(self: ConnectionManager) void {
pub fn send(self: *ConnectionManager, data: []const u8, target: Address) !void {
try self.socket.send(data, target);
}
pub fn sendRequest(self: *ConnectionManager, allocator: Allocator, data: []const u8, target: Address, timeout_ns: u64) ?[]const u8 {
self.send(data, target);
var request = Request{.address = target, .data = data};
{
self.mutex.lock();
defer self.mutex.unlock();
self.requests.append(&request);
request.requestNotifier.timedWait(self.mutex, timeout_ns) catch {};
for(self.requests.items) |req, i| {
if(req == request) {
_ = self.requests.swapRemove(i);
break;
}
}
}
// The request data gets modified when a result was received.
if(request.data == data) {
return null;
} else {
if(allocator == self.allocator) {
return request.data;
} else {
var result = allocator.dupe(request.data);
self.allocator.free(request.data);
return result;
}
}
}
pub fn addConnection(self: *ConnectionManager, conn: *Connection) !void {
self.mutex.lock();
defer self.mutex.unlock();
try self.connections.append(conn);
}
pub fn removeConnection(self: *ConnectionManager, conn: *Connection) void {
self.mutex.lock();
defer self.mutex.unlock();
for(self.connections.items) |other, i| {
if(other == conn) {
_ = self.connections.swapRemove(i);
break;
}
}
}
fn onReceive(self: *ConnectionManager, data: []const u8, source: Address) !void {
self.mutex.lock();
defer self.mutex.unlock();
for(self.connections.items) |conn| {
if(conn.remoteAddress.ip == source.ip) {
if(conn.bruteforcingPort) {
conn.remoteAddress.port = source.port;
conn.bruteforcingPort = false;
}
if(conn.remoteAddress.port == source.port) {
try conn.receive(data);
return;
}
}
}
// Check if it's part of an active request:
for(self.requests.items) |request| {
if(request.address.ip == source.ip and request.address.port == source.port) {
request.data = try self.allocator.dupe(u8, data);
request.requestNotifier.signal();
return;
}
}
if(self.externalAddress != null and source.ip == self.externalAddress.?.ip and source.port == self.externalAddress.?.port) return;
// TODO: Reduce the number of false alarms in the short period after a disconnect.
std.log.warn("Unknown connection from address: {}", .{source});
std.log.debug("Message: {any}", .{data});
}
pub fn run(self: *ConnectionManager) !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
main.threadAllocator = gpa.allocator();
defer if(gpa.deinit()) {
@panic("Memory leak");
};
_ = self; // TODO
}
var lastTime = std.time.milliTimestamp();
while(self.running.load(.Monotonic)) {
var source: Address = undefined;
if(self.socket.receive(&self.receiveBuffer, 100, &source)) |data| {
try self.onReceive(data, source);
} else |err| {
if(err == error.Timeout) {
// No message within the last ~100 ms.
} else {
return err; // TODO: Shutdown the game normally.
}
}
pub fn send(self: ConnectionManager, data: []const u8, target: Address) void {
// TODO
_ = self;
_ = data;
_ = target;
// Send a keep-alive packet roughly every 100 ms:
if(std.time.milliTimestamp() -% lastTime > 100) {
lastTime = std.time.milliTimestamp();
var i: u32 = 0;
self.mutex.lock();
defer self.mutex.unlock();
while(i < self.connections.items.len) {
var conn = self.connections.items[i];
if(lastTime -% conn.lastConnection > settings.connectionTimeout and conn.isConnected()) {
std.log.info("timeout", .{});
// Timeout a connection if it was connect at some point. New connections are not timed out because that could annoy players(having to restart the connection several times).
self.mutex.unlock();
conn.disconnect();
self.mutex.lock();
} else {
try conn.sendKeepAlive();
i += 1;
}
}
if(self.connections.items.len == 0 and self.externalAddress != null) {
// Send a message to external ip, to keep the port open:
var data: [0]u8 = undefined;
try self.send(&data, self.externalAddress.?);
}
}
}
}
};
// sockID = c.socket(c.AF_INET, c.SOCK_DGRAM, c.IPPROTO_UDP);
// defer _ = c.close(sockID);
// _ = c.memset(&otherAddr, 0, @sizeOf(c.sockaddr_in));
// otherAddr.sin_family = c.AF_INET;
// otherAddr.sin_port = c.htons(40001);
// otherAddr.sin_addr.s_addr = c.inet_addr("???.???.???.???");
// var myAddr: c.sockaddr_in = undefined;
// _ = c.memset(&myAddr, 0, @sizeOf(c.sockaddr_in));
// myAddr.sin_family = c.AF_INET;
// myAddr.sin_port = c.htons(40001);
// myAddr.sin_addr.s_addr = c.inet_addr("192.168.178.60");
//
// _ = errorCheck(c.bind(sockID, @ptrCast(*c.sockaddr, &myAddr), @sizeOf(c.sockaddr_in)));
//
// _ = std.Thread.spawn(.{}, keepAlive, .{}) catch null;
//public final class UDPConnectionManager extends Thread {
// private final DatagramPacket receivedPacket;
// public final ArrayList<UDPConnection> connections = new ArrayList<>();
// private final ArrayList<DatagramPacket> requests = new ArrayList<>();
// private volatile boolean running = true;
// public String externalIPPort = null;
// private InetAddress externalAddress = null;
// private int externalPort = 0;
//
// public void send(DatagramPacket packet) {
// try {
// socket.send(packet);
// } catch(IOException e) {
// Logger.error(e);
// }
// }
//
// public byte[] sendRequest(DatagramPacket packet, long timeout) {
// send(packet);
// byte[] request = packet.getData();
// synchronized(requests) {
// requests.add(packet);
// }
// synchronized(packet) {
// try {
// packet.wait(timeout);
// } catch(InterruptedException e) {}
// }
// synchronized(requests) {
// requests.remove(packet);
// }
// if(packet.getData() == request) {
// return null;
// } else {
// return packet.getData();
// }
// }
//
// public void addConnection(UDPConnection connection) {
// synchronized(connections) {
// connections.add(connection);
// }
// }
//
// public void removeConnection(UDPConnection connection) {
// synchronized(connections) {
// connections.remove(connection);
// }
// }
//
// public void cleanup() {
// while(!connections.isEmpty()) {
// connections.get(0).disconnect();
// }
// running = false;
// if(Thread.currentThread() != this) {
// interrupt();
// try {
// join();
// } catch(InterruptedException e) {
// Logger.error(e);
// }
// }
// socket.close();
// }
//
// private void onReceive() {
// byte[] data = receivedPacket.getData();
// int len = receivedPacket.getLength();
// InetAddress addr = receivedPacket.getAddress();
// int port = receivedPacket.getPort();
// for(UDPConnection connection : connections) {
// if(connection.remoteAddress.equals(addr)) {
// if(connection.bruteforcingPort) { // brute-forcing the port was successful.
// connection.remotePort = port;
// connection.bruteforcingPort = false;
// }
// if(connection.remotePort == port) {
// connection.receive(data, len);
// return;
// }
// }
// }
// // Check if it's part of an active request:
// synchronized(requests) {
// for(DatagramPacket packet : requests) {
// if(packet.getAddress().equals(addr) && packet.getPort() == port) {
// packet.setData(Arrays.copyOf(data, len));
// synchronized(packet) {
// packet.notify();
// }
// return;
// }
// }
// }
// if(addr.equals(externalAddress) && port == externalPort) return;
// if(addr.toString().contains("127.0.0.1")) return;
// Logger.warning("Unknown connection from address: " + addr+":"+port);
// Logger.debug("Message: "+Arrays.toString(Arrays.copyOf(data, len)));
// }
//
// @Override
// public void run() {
// assert Thread.currentThread() == this : "UDPConnectionManager.run() shouldn't be called by anyone.";
// try {
// socket.setSoTimeout(100);
// long lastTime = System.currentTimeMillis();
// while (running) {
// try {
// socket.receive(receivedPacket);
// onReceive();
// } catch(SocketTimeoutException e) {
// // No message within the last ~100 ms.
// }
//
// // Send a keep-alive packet roughly every 100 ms:
// if(System.currentTimeMillis() - lastTime > 100 && running) {
// lastTime = System.currentTimeMillis();
// for(UDPConnection connection : connections.toArray(new UDPConnection[0])) {
// if(lastTime - connection.lastConnection > CONNECTION_TIMEOUT && connection.isConnected()) {
// Logger.info("timeout");
// // Timeout a connection if it was connect at some point. New connections are not timed out because that could annoy players(having to restart the connection several times).
// connection.disconnect();
// } else {
// connection.sendKeepAlive();
// }
// }
// if(connections.isEmpty() && externalAddress != null) {
// // Send a message to external ip, to keep the port open:
// DatagramPacket packet = new DatagramPacket(new byte[0], 0);
// packet.setAddress(externalAddress);
// packet.setPort(externalPort);
// packet.setLength(0);
// send(packet);
// }
// }
// }
// } catch (Exception e) {
// Logger.crash(e);
// }
// }
//}
const UnconfirmedPacket = struct {
data: []const u8,
@ -292,7 +291,7 @@ pub const Connection = struct {
var packetsSent: u32 = 0;
var packetsResent: u32 = 0;
manager: ConnectionManager,
manager: *ConnectionManager,
gpa: std.heap.GeneralPurposeAllocator(.{}),
allocator: std.mem.Allocator,
@ -304,8 +303,8 @@ pub const Connection = struct {
streamBuffer: [maxImportantPacketSize]u8 = undefined,
streamPosition: u32 = importantHeaderSize,
messageID: u32 = 0,
unconfirmedPackets: std.ArrayList(UnconfirmedPacket),
receivedPackets: [3]std.ArrayList(u32),
unconfirmedPackets: std.ArrayList(UnconfirmedPacket) = undefined,
receivedPackets: [3]std.ArrayList(u32) = undefined,
lastReceivedPackets: [65536]?[]const u8 = undefined,
lastIndex: u32 = 0,
@ -317,11 +316,11 @@ pub const Connection = struct {
disconnected: bool = false,
handShakeComplete: bool = false,
lastConnection: i64 = 0,
lastConnection: i64,
mutex: std.Thread.Mutex = std.Thread.Mutex{},
pub fn init(manager: ConnectionManager, ipPort: []const u8) !*Connection {
pub fn init(manager: *ConnectionManager, ipPort: []const u8) !*Connection {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
var result: *Connection = try gpa.allocator().create(Connection);
result.* = Connection {
@ -329,16 +328,19 @@ pub const Connection = struct {
.gpa = gpa,
.allocator = undefined,
.remoteAddress = undefined,
.unconfirmedPackets = std.ArrayList(UnconfirmedPacket).init(gpa.allocator()),
.receivedPackets = [3]std.ArrayList(u32){
std.ArrayList(u32).init(gpa.allocator()),
std.ArrayList(u32).init(gpa.allocator()),
std.ArrayList(u32).init(gpa.allocator()),
},
.lastConnection = std.time.milliTimestamp(),
};
result.allocator = result.gpa.allocator(); // The right reference(the one that isn't on the stack) needs to be used passed!
result.unconfirmedPackets = std.ArrayList(UnconfirmedPacket).init(result.allocator);
result.receivedPackets = [3]std.ArrayList(u32){
std.ArrayList(u32).init(result.allocator),
std.ArrayList(u32).init(result.allocator),
std.ArrayList(u32).init(result.allocator),
};
var splitter = std.mem.split(u8, ipPort, ":");
result.remoteAddress.ip = try result.allocator.dupe(u8, splitter.first());
var nullTerminatedIP = try main.threadAllocator.dupeZ(u8, splitter.first());
defer main.threadAllocator.free(nullTerminatedIP);
result.remoteAddress.ip = Socket.parseIP(nullTerminatedIP);
var port = splitter.rest();
if(port.len != 0 and port[0] == '?') {
result.bruteforcingPort = true;
@ -349,16 +351,16 @@ pub const Connection = struct {
break :blk settings.defaultPort;
};
// TODO: manager.addConnection(this);
try result.manager.addConnection(result);
return result;
}
pub fn deinit(self: *Connection) void {
self.disconnect();
self.unconfirmedPackets.deinit();
self.receivedPackets[0].deinit();
self.receivedPackets[1].deinit();
self.receivedPackets[2].deinit();
self.allocator.free(self.remoteAddress.ip);
var gpa = self.gpa;
gpa.allocator().destroy(self);
if(gpa.deinit()) {
@ -367,9 +369,6 @@ pub const Connection = struct {
}
fn flush(self: *Connection) !void {
self.mutex.lock();
defer self.mutex.unlock();
if(self.streamPosition == importantHeaderSize) return; // Don't send empty packets.
// Fill the header:
self.streamBuffer[0] = Protocol.important;
@ -384,7 +383,7 @@ pub const Connection = struct {
};
try self.unconfirmedPackets.append(packet);
packetsSent += 1;
self.manager.send(packet.data, self.remoteAddress);
try self.manager.send(packet.data, self.remoteAddress);
self.streamPosition = importantHeaderSize;
}
@ -455,7 +454,7 @@ pub const Connection = struct {
}
}
fn sendKeepAlive(self: *Connection) void {
fn sendKeepAlive(self: *Connection) !void {
self.mutex.lock();
defer self.mutex.unlock();
@ -468,22 +467,22 @@ pub const Connection = struct {
for(list.items) |packetID| {
var leftRegion: ?u32 = null;
var rightRegion: ?u32 = null;
for(runLengthEncodingStarts) |start, reg| {
for(runLengthEncodingStarts.items) |start, reg| {
var diff = packetID -% start;
if(diff < runLengthEncodingLengths.items[reg]) continue;
if(diff == runLengthEncodingLengths.items[reg]) {
leftRegion = reg;
leftRegion = @intCast(u32, reg);
}
if(diff == std.math.maxInt(u32)) {
rightRegion == reg;
rightRegion = @intCast(u32, reg);
}
}
if(leftRegion) |left| {
if(rightRegion) |right| {
// Needs to combine the regions:
runLengthEncodingLengths.items[left] += runLengthEncodingLengths.items[right] + 1;
runLengthEncodingStarts.swapRemove(right);
runLengthEncodingLengths.swapRemove(right);
_ = runLengthEncodingStarts.swapRemove(right);
_ = runLengthEncodingLengths.swapRemove(right);
} else {
runLengthEncodingLengths.items[left] += 1;
}
@ -511,34 +510,34 @@ pub const Connection = struct {
std.mem.writeIntBig(u32, output[1..5], self.lastKeepAliveSent);
self.lastKeepAliveSent += 1;
std.mem.writeIntBig(u32, output[5..9], self.otherKeepAliveReceived);
var remaining: []const u8 = output[9..];
for(runLengthEncodingStarts) |_, i| {
std.mem.writeIntBig(u32, remaining[0..4], self.runLengthEncodingStarts.items[i]);
std.mem.writeIntBig(u32, remaining[4..8], self.runLengthEncodingLengths.items[i]);
var remaining: []u8 = output[9..];
for(runLengthEncodingStarts.items) |_, i| {
std.mem.writeIntBig(u32, remaining[0..4], runLengthEncodingStarts.items[i]);
std.mem.writeIntBig(u32, remaining[4..8], runLengthEncodingLengths.items[i]);
remaining = remaining[8..];
}
self.manager.send(output, self.remoteAddress);
try self.manager.send(output, self.remoteAddress);
// Resend packets that didn't receive confirmation within the last 2 keep-alive signals.
for(self.unconfirmedPackets.items) |*packet| {
if(self.lastKeepAliveReceived - packet.lastKeepAliveSentBefore >= 2) {
packetsSent += 1;
packetsResent += 1;
self.manager.send(packet.data, self.remoteAddress);
try self.manager.send(packet.data, self.remoteAddress);
packet.lastKeepAliveSentBefore = self.lastKeepAliveSent;
}
}
self.flush();
try self.flush();
if(self.bruteforcingPort) {
// This is called every 100 ms, so if I send 10 requests it shouldn't be too bad.
var i: u16 = 0;
while(i < 5): (i += 1) {
var data = [0]u8{};
if(self.remoteAddress.port +% self.bruteForcedPortRange != 0) {
self.manager.send(data, Address{self.remoteAddress.ip, self.remoteAddress.port +% self.bruteForcedPortRange});
try self.manager.send(&data, Address{.ip = self.remoteAddress.ip, .port = self.remoteAddress.port +% self.bruteForcedPortRange});
}
if(self.remoteAddress.port - self.bruteForcedPortRange != 0) {
self.manager.send(data, Address{self.remoteAddress.ip, self.remoteAddress.port -% self.bruteForcedPortRange});
try self.manager.send(&data, Address{.ip = self.remoteAddress.ip, .port = self.remoteAddress.port -% self.bruteForcedPortRange});
}
self.bruteForcedPortRange +%= 1;
}
@ -569,7 +568,7 @@ pub const Connection = struct {
// Determine the next packet length:
var len: u32 = 0;
var shift: u32 = 0;
var shift: u5 = 0;
while(true) {
if(newIndex == receivedPacket.len) {
newIndex = 0;
@ -578,7 +577,7 @@ pub const Connection = struct {
}
var nextByte = receivedPacket[newIndex];
newIndex += 1;
len |= (nextByte & 0x7f) << shift;
len |= @intCast(u32, nextByte & 0x7f) << shift;
if(nextByte & 0x80 != 0) {
shift += 7;
} else {
@ -601,7 +600,7 @@ pub const Connection = struct {
while(remaining.len != 0) {
dataAvailable = @minimum(self.lastReceivedPackets[id & 65535].?.len - newIndex, remaining.len);
std.mem.copy(u8, remaining, self.lastReceivedPackets[id & 65535].?[newIndex..dataAvailable]);
newIndex += dataAvailable;
newIndex += @intCast(u32, dataAvailable);
remaining = remaining[dataAvailable..];
if(newIndex == self.lastReceivedPackets[id & 65535].?.len) {
id += 1;
@ -619,10 +618,7 @@ pub const Connection = struct {
}
}
pub fn receive(self: *Connection, data: []const u8) void {
self.mutex.lock();
defer self.mutex.unlock();
pub fn receive(self: *Connection, data: []const u8) !void {
const protocol = data[0];
// TODO:
//if(!self.handShakeComplete and protocol != Protocols.HANDSHAKE.id and protocol != Protocol.KEEP_ALIVE and protocol != Protocol.important) {
@ -660,9 +656,9 @@ pub const Connection = struct {
if(id < self.lastIncompletePacket or self.lastReceivedPackets[id & 65535] != null) {
return; // Already received the package in the past.
}
self.lastReceivedPackets[id & 65535] = self.allocator.dupe(data[importantHeaderSize..]);
self.lastReceivedPackets[id & 65535] = try self.allocator.dupe(u8, data[importantHeaderSize..]);
// Check if a message got completed:
self.collectPackets();
try self.collectPackets();
} else if(protocol == Protocol.keepAlive) {
self.receiveKeepAlive(data[1..]);
} else {
@ -680,7 +676,7 @@ pub const Connection = struct {
// try {Thread.sleep(10);} catch(Exception e) {}
// Protocols.DISCONNECT.disconnect(this);
self.disconnected = true;
// TODO: manager.removeConnection(self);
std.log.info("Disconnected");
self.manager.removeConnection(self);
std.log.info("Disconnected", .{});
}
};

View File

@ -1,3 +1,4 @@
pub const defaultPort: u16 = 47649;
pub const defaultPort: u16 = 47649;
pub const connectionTimeout = 60000;