Add the client-side of the handshake.

This commit is contained in:
IntegratedQuantum 2022-09-11 16:45:34 +02:00
parent af54187d4b
commit 055ccf88d3
6 changed files with 396 additions and 33 deletions

1
.gitignore vendored
View File

@ -2,4 +2,5 @@ logs/
saves/
zig-out/
zig-cache/
serverAssets/
settings.json

View File

@ -1,6 +1,7 @@
const std = @import("std");
const ArrayList = std.ArrayList;
const Allocator = std.mem.Allocator;
const main = @import("main.zig");
const OutOfMemory = Allocator.Error;
@ -120,6 +121,104 @@ pub const JsonElement = union(JsonType) {
}
// TODO: toString()
fn escape(string: []const u8, allocator: Allocator) ![]const u8 {
var out = std.ArrayList(u8).init(allocator);
for(string) |char| {
switch(char) {
'\\' => try out.appendSlice("\\\\"),
'\n' => try out.appendSlice("\\n"),
'\"' => try out.appendSlice("\\\""),
'\t' => try out.appendSlice("\\t"),
else => try out.append(char),
}
}
return out.toOwnedSlice();
}
fn writeTabs(writer: std.ArrayList(u8).Writer, tabs: u32) !void {
var i: u32 = 0;
while(i < tabs): (i += 1) {
try writer.writeByte('\t');
}
}
fn recurseToString(json: JsonElement, writer: std.ArrayList(u8).Writer, tabs: u32, comptime visualCharacters: bool) !void {
switch(json) {
JsonType.JsonInt => |value| {
try std.fmt.formatInt(value, 10, .lower, .{}, writer);
},
JsonType.JsonFloat => |value| {
try std.fmt.formatFloatScientific(value, .{}, writer);
},
JsonType.JsonBool => |value| {
if(value) {
try writer.writeAll("true");
} else {
try writer.writeAll("false");
}
},
JsonType.JsonNull => {
try writer.writeAll("null");
},
JsonType.JsonString, JsonType.JsonStringOwned => |value| {
const escaped = try escape(value, main.threadAllocator);
try writer.writeByte('\"');
try writer.writeAll(escaped);
try writer.writeByte('\"');
main.threadAllocator.free(escaped);
},
JsonType.JsonArray => |array| {
try writer.writeByte('[');
for(array.items) |elem, i| {
if(i != 0) {
try writer.writeByte(',');
}
if(visualCharacters) try writer.writeByte('\n');
if(visualCharacters) try writeTabs(writer, tabs + 1);
try recurseToString(elem, writer, tabs + 1, visualCharacters);
}
if(visualCharacters) try writer.writeByte('\n');
if(visualCharacters) try writeTabs(writer, tabs);
try writer.writeByte(']');
},
JsonType.JsonObject => |obj| {
try writer.writeByte('{');
var iterator = obj.iterator();
var first: bool = true;
while(true) {
var elem = iterator.next() orelse break;
if(!first) {
try writer.writeByte(',');
}
if(visualCharacters) try writer.writeByte('\n');
if(visualCharacters) try writeTabs(writer, tabs + 1);
try writer.writeByte('\"');
try writer.writeAll(elem.key_ptr.*);
try writer.writeByte('\"');
if(visualCharacters) try writer.writeByte(' ');
try writer.writeByte(':');
if(visualCharacters) try writer.writeByte(' ');
try recurseToString(elem.value_ptr.*, writer, tabs + 1, visualCharacters);
first = false;
}
if(visualCharacters) try writer.writeByte('\n');
if(visualCharacters) try writeTabs(writer, tabs);
try writer.writeByte('}');
},
}
}
pub fn toString(json: JsonElement, allocator: Allocator) ![]const u8 {
var string = std.ArrayList(u8).init(allocator);
try recurseToString(json, string.writer(), 0, true);
return string.toOwnedSlice();
}
/// Ignores all the visual characters(spaces, tabs and newlines) and allows adding a custom prefix(which is for example required by networking).
pub fn toStringEfficient(json: JsonElement, allocator: Allocator, prefix: []const u8) ![]const u8 {
var string = std.ArrayList(u8).init(allocator);
try string.appendSlice(prefix);
try recurseToString(json, string.writer(), 0, false);
return string.toOwnedSlice();
}
};
pub fn parseFromString(allocator: Allocator, string: []const u8) JsonElement {

View File

@ -6,6 +6,7 @@ const chunk = @import("chunk.zig");
const graphics = @import("graphics.zig");
const renderer = @import("renderer.zig");
const network = @import("network.zig");
const utils = @import("utils.zig");
const Vec2f = @import("vec.zig").Vec2f;
@ -172,6 +173,8 @@ pub fn main() !void {
var conn2 = try network.Connection.init(conn, "127.0.0.1");
defer conn2.deinit();
try network.Protocols.handShake.clientSide(conn2, "quanturmdoelvloper");
c.glEnable(c.GL_CULL_FACE);
c.glCullFace(c.GL_BACK);
c.glEnable(c.GL_BLEND);

View File

@ -4,6 +4,9 @@ const Allocator = std.mem.Allocator;
const main = @import("main.zig");
const game = @import("game.zig");
const settings = @import("settings.zig");
const json = @import("json.zig");
const JsonElement = json.JsonElement;
const utils = @import("utils.zig");
//TODO: Might want to use SSL or something similar to encode the message
@ -505,10 +508,152 @@ const UnconfirmedPacket = struct {
id: u32,
};
fn addProtocol(comptime comptimeList: *[256]?*const fn(*Connection, []const u8) anyerror!void, comptime prot: type) type {
if(comptimeList[prot.id] == null and prot.id != 0 and prot.id != 0xff) {
comptimeList[prot.id] = prot.receive;
} else {
@compileError("Protocol id is already used.");
}
return prot;
}
pub const Protocols = blk: {
comptime var comptimeList = [_]?*const fn(*Connection, []const u8) anyerror!void{null} ** 256;
const Protocols_struct = struct {
list: [256]?*const fn(*Connection, []const u8) anyerror!void,
keepAlive: u8 = 0,
important: u8 = 0xff,
handShake: type = addProtocol(&comptimeList, struct {
const id: u8 = 1;
const stepStart: u8 = 0;
const stepUserData: u8 = 1;
const stepAssets: u8 = 2;
const stepServerData: u8 = 3;
const stepComplete: u8 = 255;
fn receive(conn: *Connection, data: []const u8) !void {
if(conn.handShakeState == data[0] - 1) {
conn.handShakeState = data[0];
switch(data[0]) {
stepUserData => {
var jsonObject = json.parseFromString(main.threadAllocator, data[1..]);
defer jsonObject.free(main.threadAllocator);
var name = jsonObject.get([]const u8, "name", "unnamed");
var version = jsonObject.get([]const u8, "version", "unknown");
std.log.info("User {s} joined using version {s}.", .{name, version});
{
// TODO: Send the world data.
var path = try std.fmt.allocPrint(main.threadAllocator, "saves/{s}/assets/", .{"Development"}); // TODO: Use world name.
defer main.threadAllocator.free(path);
var dir = try std.fs.cwd().openIterableDir(path, .{});
defer dir.close();
var arrayList = std.ArrayList(u8).init(main.threadAllocator);
defer arrayList.deinit();
try arrayList.append(stepAssets);
try utils.Compression.pack(dir, arrayList.writer());
std.log.debug("{any}", .{arrayList.items});
try conn.sendImportant(id, arrayList.items);
try conn.flush();
}
// TODO:
// JsonObject jsonObject = new JsonObject();
// ((User)conn).initPlayer(name);
// jsonObject.put("player", ((User)conn).player.save());
// jsonObject.put("player_id", ((User)conn).player.id);
// jsonObject.put("blockPalette", Server.world.blockPalette.save());
// JsonObject spawn = new JsonObject();
// spawn.put("x", Server.world.spawn.x);
// spawn.put("y", Server.world.spawn.y);
// spawn.put("z", Server.world.spawn.z);
// jsonObject.put("spawn", spawn);
// byte[] string = jsonObject.toString().getBytes(StandardCharsets.UTF_8);
// byte[] outData = new byte[string.length + 1];
// outData[0] = STEP_SERVER_DATA;
// System.arraycopy(string, 0, outData, 1, string.length);
// state.put(conn, STEP_SERVER_DATA);
// conn.sendImportant(this, outData);
// state.remove(conn); // Handshake is done.
// conn.handShakeComplete = true;
// synchronized(conn) { // Notify the waiting server thread.
// conn.notifyAll();
// }
},
stepAssets => {
std.log.info("Received assets.", .{});
std.fs.cwd().deleteTree("serverAssets") catch {}; // Delete old assets.
try std.fs.cwd().makePath("serverAssets");
try utils.Compression.unpack(try std.fs.cwd().openDir("serverAssets", .{}), data[1..]);
},
stepServerData => {
var jsonObject = json.parseFromString(main.threadAllocator, data[1..]);
defer jsonObject.free(main.threadAllocator);
// TODO: ((ServerConnection)conn).world.finishHandshake(json);
conn.handShakeState = stepComplete;
conn.handShakeWaiting.broadcast(); // Notify the waiting client thread.
},
stepComplete => {
},
else => {
std.log.err("Unknown state in HandShakeProtocol {}", .{data[0]});
},
}
} else {
// Ignore packages that refer to an unexpected state. Normally those might be packages that were resent by the other side.
}
_ = data;
}
pub fn serverSide(conn: *Connection) void {
conn.handShakeState = stepStart;
}
pub fn clientSide(conn: *Connection, name: []const u8) !void {
var jsonObject = JsonElement{.JsonObject=try main.threadAllocator.create(std.StringHashMap(JsonElement))};
defer jsonObject.free(main.threadAllocator);
jsonObject.JsonObject.* = std.StringHashMap(JsonElement).init(main.threadAllocator);
try jsonObject.JsonObject.put(try main.threadAllocator.dupe(u8, "version"), JsonElement{.JsonString=settings.version});
try jsonObject.JsonObject.put(try main.threadAllocator.dupe(u8, "name"), JsonElement{.JsonString=name});
var prefix = [1]u8 {stepUserData};
var data = try jsonObject.toStringEfficient(main.threadAllocator, &prefix);
defer main.threadAllocator.free(data);
try conn.sendImportant(id, data);
conn.handShakeState = stepUserData;
conn.mutex.lock();
conn.handShakeWaiting.wait(&conn.mutex);
conn.mutex.unlock();
}
}),
};
break :blk Protocols_struct{.list = comptimeList};
};
//public final class Protocols {
// public static final int[] bytesReceived = new int[256];
// public static final int[] packetsReceived = new int[256];
//
// public static final byte KEEP_ALIVE = 0;
// public static final byte IMPORTANT_PACKET = (byte)0xff;
// public static final HandshakeProtocol HANDSHAKE = new HandshakeProtocol();
// public static final ChunkRequestProtocol CHUNK_REQUEST = new ChunkRequestProtocol();
// public static final ChunkTransmissionProtocol CHUNK_TRANSMISSION = new ChunkTransmissionProtocol();
// public static final PlayerPositionProtocol PLAYER_POSITION = new PlayerPositionProtocol();
// public static final DisconnectProtocol DISCONNECT = new DisconnectProtocol();
// public static final EntityPositionProtocol ENTITY_POSITION = new EntityPositionProtocol();
// public static final BlockUpdateProtocol BLOCK_UPDATE = new BlockUpdateProtocol();
// public static final EntityProtocol ENTITY = new EntityProtocol();
// public static final GenericUpdateProtocol GENERIC_UPDATE = new GenericUpdateProtocol();
// public static final ChatProtocol CHAT = new ChatProtocol();
//}
const Protocol = struct {
id: u8,
const keepAlive: u8 = 0;
const important: u8 = 0xff;
}; // TODO
@ -535,7 +680,7 @@ pub const Connection = struct {
messageID: u32 = 0,
unconfirmedPackets: std.ArrayList(UnconfirmedPacket) = undefined,
receivedPackets: [3]std.ArrayList(u32) = undefined,
lastReceivedPackets: [65536]?[]const u8 = undefined,
lastReceivedPackets: [65536]?[]const u8 = [_]?[]const u8{null} ** 65536,
lastIndex: u32 = 0,
lastIncompletePacket: u32 = 0,
@ -545,7 +690,8 @@ pub const Connection = struct {
otherKeepAliveReceived: u32 = 0,
disconnected: bool = false,
handShakeComplete: bool = false,
handShakeState: u8 = Protocols.handShake.stepStart,
handShakeWaiting: std.Thread.Condition = std.Thread.Condition{},
lastConnection: i64,
mutex: std.Thread.Mutex = std.Thread.Mutex{},
@ -618,40 +764,40 @@ pub const Connection = struct {
self.streamPosition = importantHeaderSize;
}
fn writeByteToStream(self: *Connection, data: u8) void {
fn writeByteToStream(self: *Connection, data: u8) !void {
self.streamBuffer[self.streamPosition] = data;
self.streamPosition += 1;
if(self.streamPosition == self.streamBuffer.length) {
self.flush();
if(self.streamPosition == self.streamBuffer.len) {
try self.flush();
}
}
pub fn sendImportant(self: *Connection, source: Protocol, data: []const u8) void {
pub fn sendImportant(self: *Connection, id: u8, data: []const u8) !void {
self.mutex.lock();
defer self.mutex.unlock();
if(self.disconnected) return;
self.writeByteToStream(source.id);
try self.writeByteToStream(id);
var processedLength = data.len;
while(processedLength > 0x7f) {
self.writeByteToStream(@intCast(u8, processedLength & 0x7f) | 0x80);
try self.writeByteToStream(@intCast(u8, processedLength & 0x7f) | 0x80);
processedLength >>= 7;
}
self.writeByteToStream(@intCast(u8, processedLength & 0x7f));
try self.writeByteToStream(@intCast(u8, processedLength & 0x7f));
var remaining: []const u8 = data;
while(remaining.len != 0) {
var copyableSize = @minimum(remaining.len, self.streamBuffer.len - self.streamPosition);
std.mem.copy(u8, self.streamBuffer, remaining[0..copyableSize]);
std.mem.copy(u8, self.streamBuffer[self.streamPosition..], remaining[0..copyableSize]);
remaining = remaining[copyableSize..];
self.streamPosition += copyableSize;
self.streamPosition += @intCast(u32, copyableSize);
if(self.streamPosition == self.streamBuffer.len) {
self.flush();
try self.flush();
}
}
}
pub fn sendUnimportant(self: *Connection, source: Protocol, data: []const u8) !void {
pub fn sendUnimportant(self: *Connection, id: u8, data: []const u8) !void {
self.mutex.lock();
defer self.mutex.unlock();
@ -659,7 +805,7 @@ pub const Connection = struct {
std.debug.assert(data.len + 1 < maxPacketSize);
var fullData = try main.threadAllocator.alloc(u8, data.len + 1);
defer main.threadAllocator.free(fullData);
fullData[0] = source.id;
fullData[0] = id;
std.mem.copy(u8, fullData[1..], data);
self.manager.send(fullData, self.remoteAddress);
}
@ -672,14 +818,17 @@ pub const Connection = struct {
self.lastKeepAliveReceived = std.mem.readIntBig(u32, data[4..8]);
var remaining: []const u8 = data[8..];
while(remaining.len >= 8) {
var start = std.mem.readIntBig(u32, data[0..4]);
var len = std.mem.readIntBig(u32, data[4..8]);
var start = std.mem.readIntBig(u32, remaining[0..4]);
var len = std.mem.readIntBig(u32, remaining[4..8]);
remaining = remaining[8..];
var j: usize = 0;
while(j < self.unconfirmedPackets.items.len): (j += 1) {
while(j < self.unconfirmedPackets.items.len) {
var diff = self.unconfirmedPackets.items[j].id -% start;
if(diff < len) {
self.allocator.free(self.unconfirmedPackets.items[j].data);
_ = self.unconfirmedPackets.swapRemove(j);
j -= 1;
} else {
j += 1;
}
}
}
@ -751,7 +900,7 @@ pub const Connection = struct {
// Resend packets that didn't receive confirmation within the last 2 keep-alive signals.
for(self.unconfirmedPackets.items) |*packet| {
if(self.lastKeepAliveReceived - packet.lastKeepAliveSentBefore >= 2) {
if(self.lastKeepAliveReceived -% packet.lastKeepAliveSentBefore >= 2) {
packetsSent += 1;
packetsResent += 1;
try self.manager.send(packet.data, self.remoteAddress);
@ -783,19 +932,14 @@ pub const Connection = struct {
}
fn collectPackets(self: *Connection) !void {
self.mutex.lock();
defer self.mutex.unlock();
while(true) {
var id = self.lastIncompletePacket;
var receivedPacket = self.lastReceivedPackets[id & 65535] orelse return;
var newIndex = self.lastIndex;
var protocol = receivedPacket[newIndex];
newIndex += 1;
// TODO:
_ = protocol;
// if(Cubyz.world == null && protocol != Protocols.HANDSHAKE.id)
// return;
if(game.world == null and protocol != Protocols.handShake.id)
return;
// Determine the next packet length:
var len: u32 = 0;
@ -830,7 +974,7 @@ pub const Connection = struct {
var remaining = data[0..];
while(remaining.len != 0) {
dataAvailable = @minimum(self.lastReceivedPackets[id & 65535].?.len - newIndex, remaining.len);
std.mem.copy(u8, remaining, self.lastReceivedPackets[id & 65535].?[newIndex..dataAvailable]);
std.mem.copy(u8, remaining, self.lastReceivedPackets[id & 65535].?[newIndex..newIndex + dataAvailable]);
newIndex += @intCast(u32, dataAvailable);
remaining = remaining[dataAvailable..];
if(newIndex == self.lastReceivedPackets[id & 65535].?.len) {
@ -845,23 +989,27 @@ pub const Connection = struct {
self.lastIndex = newIndex;
// TODO:
// Protocols.bytesReceived[protocol & 0xff] += data.length + 1;
// Protocols.list[protocol].receive(this, data, 0, data.length);
if(Protocols.list[protocol]) |prot| {
try prot(self, data);
} else {
std.log.warn("Received unknown protocol width id {}", .{protocol});
}
}
}
pub fn receive(self: *Connection, data: []const u8) !void {
const protocol = data[0];
// TODO:
//if(!self.handShakeComplete and protocol != Protocols.HANDSHAKE.id and protocol != Protocol.KEEP_ALIVE and protocol != Protocol.important) {
// return; // Reject all non-handshake packets until the handshake is done.
//}
if(self.handShakeState != Protocols.handShake.stepComplete and protocol != Protocols.handShake.id and protocol != Protocols.keepAlive and protocol != Protocols.important) {
return; // Reject all non-handshake packets until the handshake is done.
}
self.lastConnection = std.time.milliTimestamp();
// TODO:
// Protocols.bytesReceived[protocol & 0xff] += len + 20 + 8; // Including IP header and udp header
// Protocols.packetsReceived[protocol & 0xff]++;
if(protocol == Protocol.important) {
var id = std.mem.readIntBig(u32, data[1..5]);
if(self.handShakeComplete and id == 0) { // Got a new "first" packet from client. So the client tries to reconnect, but we still think it's connected.
if(self.handShakeState == Protocols.handShake.stepComplete and id == 0) { // Got a new "first" packet from client. So the client tries to reconnect, but we still think it's connected.
// TODO:
// if(this instanceof User) {
// Server.disconnect((User)this);

View File

@ -1,4 +1,6 @@
pub const defaultPort: u16 = 47649;
pub const connectionTimeout = 60000;
pub const connectionTimeout = 60000;
pub const version = "0.12.0";

110
src/utils.zig Normal file
View File

@ -0,0 +1,110 @@
const std = @import("std");
const main = @import("main.zig");
pub const Compression = struct {
pub fn pack(sourceDir: std.fs.IterableDir, writer: anytype) !void {
var comp = try std.compress.deflate.compressor(main.threadAllocator, writer, .{.level = .no_compression});
defer comp.deinit();
var walker = try sourceDir.walk(main.threadAllocator);
defer walker.deinit();
while(try walker.next()) |entry| {
if(entry.kind == .File) {
var relPath = entry.path;
var len: [4]u8 = undefined;
std.mem.writeIntBig(u32, &len, @intCast(u32, relPath.len));
_ = try comp.write(&len);
_ = try comp.write(relPath);
var file = try sourceDir.dir.openFile(relPath, .{});
defer file.close();
var fileData = try file.readToEndAlloc(main.threadAllocator, std.math.maxInt(u32));
defer main.threadAllocator.free(fileData);
std.mem.writeIntBig(u32, &len, @intCast(u32, fileData.len));
_ = try comp.write(&len);
_ = try comp.write(fileData);
}
}
try comp.close();
}
pub fn unpack(outDir: std.fs.Dir, input: []const u8) !void {
var stream = std.io.fixedBufferStream(input);
var decomp = try std.compress.deflate.decompressor(main.threadAllocator, stream.reader(), null);
defer decomp.deinit();
var reader = decomp.reader();
const _data = try reader.readAllAlloc(main.threadAllocator, std.math.maxInt(usize));
defer main.threadAllocator.free(_data);
var data = _data;
while(data.len != 0) {
var len = std.mem.readIntBig(u32, data[0..4]);
data = data[4..];
var path = data[0..len];
data = data[len..];
len = std.mem.readIntBig(u32, data[0..4]);
data = data[4..];
var fileData = data[0..len];
data = data[len..];
var splitter = std.mem.splitBackwards(u8, path, "/");
_ = splitter.first();
try outDir.makePath(splitter.rest());
var file = try outDir.createFile(path, .{});
defer file.close();
try file.writeAll(fileData);
}
}
// public static void pack(String sourceDirPath, OutputStream outputstream){
// try {
// DeflaterOutputStream compressedOut = new DeflaterOutputStream(outputstream);
// Path path = Paths.get(sourceDirPath);
// Files.walk(path)
// .filter(p -> !Files.isDirectory(p)) // potential bug
// .forEach(p -> {
// String relPath = path.relativize(p).toString();
// try {
// byte[] strData = relPath.getBytes(StandardCharsets.UTF_8);
// byte[] len = new byte[4];
// Bits.putInt(len, 0, strData.length);
// compressedOut.write(len);
// compressedOut.write(strData);
// byte[] file = Files.readAllBytes(p);
// Bits.putInt(len, 0, file.length);
// compressedOut.write(len);
// compressedOut.write(file);
// } catch (IOException e) {
// Logger.error(e);
// }
// });
// compressedOut.close();
// }catch(IOException exception){
// Logger.error(exception);
// }
// }
// public static void unpack(String outputFolderPath, InputStream inputStream){
// try {
// File outputFolder = new File(outputFolderPath);
// if (!outputFolder.exists()) {
// outputFolder.mkdir();
// }
// InflaterInputStream compressedIn = new InflaterInputStream(inputStream);
// while(compressedIn.available() != 0) {
// byte[] len = compressedIn.readNBytes(4);
// byte[] pathBytes = compressedIn.readNBytes(Bits.getInt(len ,0));
// String path = new String(pathBytes, StandardCharsets.UTF_8);
// String filePath = outputFolder.getAbsolutePath() + File.separator + path;
// len = compressedIn.readNBytes(4);
// byte[] fileBytes = compressedIn.readNBytes(Bits.getInt(len ,0));
// new File(filePath).getParentFile().mkdirs();
// BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath));
// bos.write(fileBytes, 0, fileBytes.length);
// bos.close();
// }
// compressedIn.close();
// }catch (Exception e){
// Logger.error(e);
// }
// }
};