mirror of
https://github.com/PixelGuys/Cubyz.git
synced 2025-09-12 05:53:51 -04:00
Add some more protocols and fix some stuff.
This commit is contained in:
parent
055ccf88d3
commit
0e90e2b4e6
108
src/network.zig
108
src/network.zig
@ -1,6 +1,7 @@
|
|||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const Allocator = std.mem.Allocator;
|
const Allocator = std.mem.Allocator;
|
||||||
|
|
||||||
|
const chunk = @import("chunk.zig");
|
||||||
const main = @import("main.zig");
|
const main = @import("main.zig");
|
||||||
const game = @import("game.zig");
|
const game = @import("game.zig");
|
||||||
const settings = @import("settings.zig");
|
const settings = @import("settings.zig");
|
||||||
@ -308,6 +309,7 @@ const STUN = struct {
|
|||||||
pub const ConnectionManager = struct {
|
pub const ConnectionManager = struct {
|
||||||
socket: Socket = undefined,
|
socket: Socket = undefined,
|
||||||
thread: std.Thread = undefined,
|
thread: std.Thread = undefined,
|
||||||
|
threadId: std.Thread.Id = undefined,
|
||||||
externalAddress: ?Address = null,
|
externalAddress: ?Address = null,
|
||||||
online: bool = false,
|
online: bool = false,
|
||||||
running: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),
|
running: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),
|
||||||
@ -346,7 +348,7 @@ pub const ConnectionManager = struct {
|
|||||||
Socket.deinit(self.socket);
|
Socket.deinit(self.socket);
|
||||||
|
|
||||||
for(self.connections.items) |conn| {
|
for(self.connections.items) |conn| {
|
||||||
conn.disconnect();
|
conn.disconnect() catch |err| {std.log.warn("Error while disconnecting: {s}", .{@errorName(err)});};
|
||||||
}
|
}
|
||||||
self.connections.deinit();
|
self.connections.deinit();
|
||||||
for(self.requests.items) |request| {
|
for(self.requests.items) |request| {
|
||||||
@ -424,8 +426,8 @@ pub const ConnectionManager = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn onReceive(self: *ConnectionManager, data: []const u8, source: Address) !void {
|
fn onReceive(self: *ConnectionManager, data: []const u8, source: Address) !void {
|
||||||
|
std.debug.assert(self.threadId == std.Thread.getCurrentId());
|
||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
defer self.mutex.unlock();
|
|
||||||
|
|
||||||
for(self.connections.items) |conn| {
|
for(self.connections.items) |conn| {
|
||||||
if(conn.remoteAddress.ip == source.ip) {
|
if(conn.remoteAddress.ip == source.ip) {
|
||||||
@ -434,11 +436,13 @@ pub const ConnectionManager = struct {
|
|||||||
conn.bruteforcingPort = false;
|
conn.bruteforcingPort = false;
|
||||||
}
|
}
|
||||||
if(conn.remoteAddress.port == source.port) {
|
if(conn.remoteAddress.port == source.port) {
|
||||||
|
self.mutex.unlock();
|
||||||
try conn.receive(data);
|
try conn.receive(data);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
defer self.mutex.unlock();
|
||||||
// Check if it's part of an active request:
|
// Check if it's part of an active request:
|
||||||
for(self.requests.items) |request| {
|
for(self.requests.items) |request| {
|
||||||
if(request.address.ip == source.ip and request.address.port == source.port) {
|
if(request.address.ip == source.ip and request.address.port == source.port) {
|
||||||
@ -454,6 +458,7 @@ pub const ConnectionManager = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(self: *ConnectionManager) !void {
|
pub fn run(self: *ConnectionManager) !void {
|
||||||
|
self.threadId = std.Thread.getCurrentId();
|
||||||
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
|
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
|
||||||
main.threadAllocator = gpa.allocator();
|
main.threadAllocator = gpa.allocator();
|
||||||
defer if(gpa.deinit()) {
|
defer if(gpa.deinit()) {
|
||||||
@ -485,7 +490,7 @@ pub const ConnectionManager = struct {
|
|||||||
std.log.info("timeout", .{});
|
std.log.info("timeout", .{});
|
||||||
// Timeout a connection if it was connect at some point. New connections are not timed out because that could annoy players(having to restart the connection several times).
|
// Timeout a connection if it was connect at some point. New connections are not timed out because that could annoy players(having to restart the connection several times).
|
||||||
self.mutex.unlock();
|
self.mutex.unlock();
|
||||||
conn.disconnect();
|
try conn.disconnect();
|
||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
} else {
|
} else {
|
||||||
try conn.sendKeepAlive();
|
try conn.sendKeepAlive();
|
||||||
@ -517,10 +522,13 @@ fn addProtocol(comptime comptimeList: *[256]?*const fn(*Connection, []const u8)
|
|||||||
return prot;
|
return prot;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub var bytesReceived: [256]usize = [_]usize {0} ** 256;
|
||||||
|
pub var packetsReceived: [256]usize = [_]usize {0} ** 256;
|
||||||
pub const Protocols = blk: {
|
pub const Protocols = blk: {
|
||||||
comptime var comptimeList = [_]?*const fn(*Connection, []const u8) anyerror!void{null} ** 256;
|
comptime var comptimeList = [_]?*const fn(*Connection, []const u8) anyerror!void{null} ** 256;
|
||||||
const Protocols_struct = struct {
|
const Protocols_struct = struct {
|
||||||
list: [256]?*const fn(*Connection, []const u8) anyerror!void,
|
list: [256]?*const fn(*Connection, []const u8) anyerror!void,
|
||||||
|
|
||||||
keepAlive: u8 = 0,
|
keepAlive: u8 = 0,
|
||||||
important: u8 = 0xff,
|
important: u8 = 0xff,
|
||||||
handShake: type = addProtocol(&comptimeList, struct {
|
handShake: type = addProtocol(&comptimeList, struct {
|
||||||
@ -627,15 +635,52 @@ pub const Protocols = blk: {
|
|||||||
conn.mutex.unlock();
|
conn.mutex.unlock();
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
|
chunkRequest: type = addProtocol(&comptimeList, struct {
|
||||||
|
const id: u8 = 2;
|
||||||
|
fn receive(conn: *Connection, data: []const u8) !void {
|
||||||
|
var remaining = data[0..];
|
||||||
|
while(remaining.len >= 16) {
|
||||||
|
const request = chunk.ChunkPosition{
|
||||||
|
.wx = std.mem.readIntBig(chunk.ChunkCoordinate, data[0..4]),
|
||||||
|
.wy = std.mem.readIntBig(chunk.ChunkCoordinate, data[4..8]),
|
||||||
|
.wz = std.mem.readIntBig(chunk.ChunkCoordinate, data[8..12]),
|
||||||
|
.voxelSize = std.mem.readIntBig(chunk.ChunkCoordinate, data[12..16]),
|
||||||
|
};
|
||||||
|
_ = request;
|
||||||
|
_ = conn;
|
||||||
|
// TODO: Server.world.queueChunk(request, (User)conn);
|
||||||
|
remaining = remaining[16..];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn sendRequest(conn: *Connection, requests: []chunk.ChunkPosition) !void {
|
||||||
|
if(requests.len == 0) return;
|
||||||
|
var data = main.threadAllocator.alloc(16*requests.len);
|
||||||
|
defer main.threadAllocator.free(data);
|
||||||
|
var remaining = data;
|
||||||
|
for(requests) |req| {
|
||||||
|
std.mem.writeIntBig(chunk.ChunkCoordinate, data[0..4], req.wx);
|
||||||
|
std.mem.writeIntBig(chunk.ChunkCoordinate, data[4..8], req.wy);
|
||||||
|
std.mem.writeIntBig(chunk.ChunkCoordinate, data[8..12], req.wz);
|
||||||
|
std.mem.writeIntBig(chunk.ChunkCoordinate, data[12..16], req.voxelSize);
|
||||||
|
remaining = remaining[16..];
|
||||||
|
}
|
||||||
|
conn.sendImportant(id, data);
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
disconnect: type = addProtocol(&comptimeList, struct {
|
||||||
|
const id: u8 = 5;
|
||||||
|
fn receive(conn: *Connection, _: []const u8) !void {
|
||||||
|
try conn.disconnect();
|
||||||
|
}
|
||||||
|
pub fn disconnect(conn: *Connection) !void {
|
||||||
|
const noData = [0]u8 {};
|
||||||
|
try conn.sendUnimportant(id, &noData);
|
||||||
|
}
|
||||||
|
}),
|
||||||
};
|
};
|
||||||
break :blk Protocols_struct{.list = comptimeList};
|
break :blk Protocols_struct{.list = comptimeList};
|
||||||
};
|
};
|
||||||
//public final class Protocols {
|
//public final class Protocols {
|
||||||
// public static final int[] bytesReceived = new int[256];
|
|
||||||
// public static final int[] packetsReceived = new int[256];
|
|
||||||
//
|
|
||||||
// public static final byte KEEP_ALIVE = 0;
|
|
||||||
// public static final byte IMPORTANT_PACKET = (byte)0xff;
|
|
||||||
// public static final HandshakeProtocol HANDSHAKE = new HandshakeProtocol();
|
// public static final HandshakeProtocol HANDSHAKE = new HandshakeProtocol();
|
||||||
// public static final ChunkRequestProtocol CHUNK_REQUEST = new ChunkRequestProtocol();
|
// public static final ChunkRequestProtocol CHUNK_REQUEST = new ChunkRequestProtocol();
|
||||||
// public static final ChunkTransmissionProtocol CHUNK_TRANSMISSION = new ChunkTransmissionProtocol();
|
// public static final ChunkTransmissionProtocol CHUNK_TRANSMISSION = new ChunkTransmissionProtocol();
|
||||||
@ -733,11 +778,16 @@ pub const Connection = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn deinit(self: *Connection) void {
|
pub fn deinit(self: *Connection) void {
|
||||||
self.disconnect();
|
self.disconnect() catch |err| {std.log.warn("Error while disconnecting: {s}", .{@errorName(err)});};
|
||||||
self.unconfirmedPackets.deinit();
|
self.unconfirmedPackets.deinit();
|
||||||
self.receivedPackets[0].deinit();
|
self.receivedPackets[0].deinit();
|
||||||
self.receivedPackets[1].deinit();
|
self.receivedPackets[1].deinit();
|
||||||
self.receivedPackets[2].deinit();
|
self.receivedPackets[2].deinit();
|
||||||
|
for(self.lastReceivedPackets) |nullablePacket| {
|
||||||
|
if(nullablePacket) |packet| {
|
||||||
|
self.allocator.free(packet);
|
||||||
|
}
|
||||||
|
}
|
||||||
var gpa = self.gpa;
|
var gpa = self.gpa;
|
||||||
gpa.allocator().destroy(self);
|
gpa.allocator().destroy(self);
|
||||||
if(gpa.deinit()) {
|
if(gpa.deinit()) {
|
||||||
@ -807,10 +857,11 @@ pub const Connection = struct {
|
|||||||
defer main.threadAllocator.free(fullData);
|
defer main.threadAllocator.free(fullData);
|
||||||
fullData[0] = id;
|
fullData[0] = id;
|
||||||
std.mem.copy(u8, fullData[1..], data);
|
std.mem.copy(u8, fullData[1..], data);
|
||||||
self.manager.send(fullData, self.remoteAddress);
|
try self.manager.send(fullData, self.remoteAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn receiveKeepAlive(self: *Connection, data: []const u8) void {
|
fn receiveKeepAlive(self: *Connection, data: []const u8) void {
|
||||||
|
std.debug.assert(self.manager.threadId == std.Thread.getCurrentId());
|
||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
defer self.mutex.unlock();
|
defer self.mutex.unlock();
|
||||||
|
|
||||||
@ -835,6 +886,7 @@ pub const Connection = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn sendKeepAlive(self: *Connection) !void {
|
fn sendKeepAlive(self: *Connection) !void {
|
||||||
|
std.debug.assert(self.manager.threadId == std.Thread.getCurrentId());
|
||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
defer self.mutex.unlock();
|
defer self.mutex.unlock();
|
||||||
|
|
||||||
@ -900,7 +952,7 @@ pub const Connection = struct {
|
|||||||
|
|
||||||
// Resend packets that didn't receive confirmation within the last 2 keep-alive signals.
|
// Resend packets that didn't receive confirmation within the last 2 keep-alive signals.
|
||||||
for(self.unconfirmedPackets.items) |*packet| {
|
for(self.unconfirmedPackets.items) |*packet| {
|
||||||
if(self.lastKeepAliveReceived -% packet.lastKeepAliveSentBefore >= 2) {
|
if(self.lastKeepAliveReceived -% @as(i33, packet.lastKeepAliveSentBefore) >= 2) {
|
||||||
packetsSent += 1;
|
packetsSent += 1;
|
||||||
packetsResent += 1;
|
packetsResent += 1;
|
||||||
try self.manager.send(packet.data, self.remoteAddress);
|
try self.manager.send(packet.data, self.remoteAddress);
|
||||||
@ -932,6 +984,7 @@ pub const Connection = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn collectPackets(self: *Connection) !void {
|
fn collectPackets(self: *Connection) !void {
|
||||||
|
std.debug.assert(self.manager.threadId == std.Thread.getCurrentId());
|
||||||
while(true) {
|
while(true) {
|
||||||
var id = self.lastIncompletePacket;
|
var id = self.lastIncompletePacket;
|
||||||
var receivedPacket = self.lastReceivedPackets[id & 65535] orelse return;
|
var receivedPacket = self.lastReceivedPackets[id & 65535] orelse return;
|
||||||
@ -987,26 +1040,24 @@ pub const Connection = struct {
|
|||||||
self.lastReceivedPackets[self.lastIncompletePacket & 65535] = null;
|
self.lastReceivedPackets[self.lastIncompletePacket & 65535] = null;
|
||||||
}
|
}
|
||||||
self.lastIndex = newIndex;
|
self.lastIndex = newIndex;
|
||||||
// TODO:
|
bytesReceived[protocol] += data.len + 1 + (7 + std.math.log2_int(usize, 1 + data.len))/7;
|
||||||
// Protocols.bytesReceived[protocol & 0xff] += data.length + 1;
|
|
||||||
if(Protocols.list[protocol]) |prot| {
|
if(Protocols.list[protocol]) |prot| {
|
||||||
try prot(self, data);
|
try prot(self, data);
|
||||||
} else {
|
} else {
|
||||||
std.log.warn("Received unknown protocol width id {}", .{protocol});
|
std.log.warn("Received unknown important protocol width id {}", .{protocol});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn receive(self: *Connection, data: []const u8) !void {
|
pub fn receive(self: *Connection, data: []const u8) !void {
|
||||||
|
std.debug.assert(self.manager.threadId == std.Thread.getCurrentId());
|
||||||
const protocol = data[0];
|
const protocol = data[0];
|
||||||
// TODO:
|
|
||||||
if(self.handShakeState != Protocols.handShake.stepComplete and protocol != Protocols.handShake.id and protocol != Protocols.keepAlive and protocol != Protocols.important) {
|
if(self.handShakeState != Protocols.handShake.stepComplete and protocol != Protocols.handShake.id and protocol != Protocols.keepAlive and protocol != Protocols.important) {
|
||||||
return; // Reject all non-handshake packets until the handshake is done.
|
return; // Reject all non-handshake packets until the handshake is done.
|
||||||
}
|
}
|
||||||
self.lastConnection = std.time.milliTimestamp();
|
self.lastConnection = std.time.milliTimestamp();
|
||||||
// TODO:
|
bytesReceived[protocol] += data.len + 20 + 8; // Including IP header and udp header;
|
||||||
// Protocols.bytesReceived[protocol & 0xff] += len + 20 + 8; // Including IP header and udp header
|
packetsReceived[protocol] += 1;
|
||||||
// Protocols.packetsReceived[protocol & 0xff]++;
|
|
||||||
if(protocol == Protocol.important) {
|
if(protocol == Protocol.important) {
|
||||||
var id = std.mem.readIntBig(u32, data[1..5]);
|
var id = std.mem.readIntBig(u32, data[1..5]);
|
||||||
if(self.handShakeState == Protocols.handShake.stepComplete and id == 0) { // Got a new "first" packet from client. So the client tries to reconnect, but we still think it's connected.
|
if(self.handShakeState == Protocols.handShake.stepComplete and id == 0) { // Got a new "first" packet from client. So the client tries to reconnect, but we still think it's connected.
|
||||||
@ -1027,7 +1078,7 @@ pub const Connection = struct {
|
|||||||
// Logger.error("Server 'reconnected'? This makes no sense and the game can't handle that.");
|
// Logger.error("Server 'reconnected'? This makes no sense and the game can't handle that.");
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
if(id - self.lastIncompletePacket >= 65536) {
|
if(id - @as(i33, self.lastIncompletePacket) >= 65536) {
|
||||||
std.log.warn("Many incomplete packages. Cannot process any more packages for now.", .{});
|
std.log.warn("Many incomplete packages. Cannot process any more packages for now.", .{});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -1041,19 +1092,22 @@ pub const Connection = struct {
|
|||||||
} else if(protocol == Protocol.keepAlive) {
|
} else if(protocol == Protocol.keepAlive) {
|
||||||
self.receiveKeepAlive(data[1..]);
|
self.receiveKeepAlive(data[1..]);
|
||||||
} else {
|
} else {
|
||||||
// TODO: Protocols.list[protocol & 0xff].receive(this, data, 1, len - 1);
|
if(Protocols.list[protocol]) |prot| {
|
||||||
|
try prot(self, data);
|
||||||
|
} else {
|
||||||
|
std.log.warn("Received unknown protocol width id {}", .{protocol});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disconnect(self: *Connection) void {
|
pub fn disconnect(self: *Connection) !void {
|
||||||
// Send 3 disconnect packages to the other side, just to be sure.
|
// Send 3 disconnect packages to the other side, just to be sure.
|
||||||
// If all of them don't get through then there is probably a network issue anyways which would lead to a timeout.
|
// If all of them don't get through then there is probably a network issue anyways which would lead to a timeout.
|
||||||
// TODO:
|
try Protocols.disconnect.disconnect(self);
|
||||||
// Protocols.DISCONNECT.disconnect(this);
|
std.time.sleep(10000000);
|
||||||
// try {Thread.sleep(10);} catch(Exception e) {}
|
try Protocols.disconnect.disconnect(self);
|
||||||
// Protocols.DISCONNECT.disconnect(this);
|
std.time.sleep(10000000);
|
||||||
// try {Thread.sleep(10);} catch(Exception e) {}
|
try Protocols.disconnect.disconnect(self);
|
||||||
// Protocols.DISCONNECT.disconnect(this);
|
|
||||||
self.disconnected = true;
|
self.disconnected = true;
|
||||||
self.manager.removeConnection(self);
|
self.manager.removeConnection(self);
|
||||||
std.log.info("Disconnected", .{});
|
std.log.info("Disconnected", .{});
|
||||||
|
Loading…
x
Reference in New Issue
Block a user