diff --git a/.gitignore b/.gitignore index 610adde6..a849fe3f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ logs/ saves/ zig-out/ zig-cache/ +serverAssets/ settings.json \ No newline at end of file diff --git a/src/json.zig b/src/json.zig index 3296b5bc..5c801c72 100644 --- a/src/json.zig +++ b/src/json.zig @@ -1,6 +1,7 @@ const std = @import("std"); const ArrayList = std.ArrayList; const Allocator = std.mem.Allocator; +const main = @import("main.zig"); const OutOfMemory = Allocator.Error; @@ -120,6 +121,104 @@ pub const JsonElement = union(JsonType) { } // TODO: toString() + fn escape(string: []const u8, allocator: Allocator) ![]const u8 { + var out = std.ArrayList(u8).init(allocator); + for(string) |char| { + switch(char) { + '\\' => try out.appendSlice("\\\\"), + '\n' => try out.appendSlice("\\n"), + '\"' => try out.appendSlice("\\\""), + '\t' => try out.appendSlice("\\t"), + else => try out.append(char), + } + } + return out.toOwnedSlice(); + } + fn writeTabs(writer: std.ArrayList(u8).Writer, tabs: u32) !void { + var i: u32 = 0; + while(i < tabs): (i += 1) { + try writer.writeByte('\t'); + } + } + fn recurseToString(json: JsonElement, writer: std.ArrayList(u8).Writer, tabs: u32, comptime visualCharacters: bool) !void { + switch(json) { + JsonType.JsonInt => |value| { + try std.fmt.formatInt(value, 10, .lower, .{}, writer); + }, + JsonType.JsonFloat => |value| { + try std.fmt.formatFloatScientific(value, .{}, writer); + }, + JsonType.JsonBool => |value| { + if(value) { + try writer.writeAll("true"); + } else { + try writer.writeAll("false"); + } + }, + JsonType.JsonNull => { + try writer.writeAll("null"); + }, + JsonType.JsonString, JsonType.JsonStringOwned => |value| { + const escaped = try escape(value, main.threadAllocator); + try writer.writeByte('\"'); + try writer.writeAll(escaped); + try writer.writeByte('\"'); + main.threadAllocator.free(escaped); + }, + JsonType.JsonArray => |array| { + try writer.writeByte('['); + for(array.items) |elem, i| { + if(i != 0) { + try writer.writeByte(','); + } + if(visualCharacters) try writer.writeByte('\n'); + if(visualCharacters) try writeTabs(writer, tabs + 1); + try recurseToString(elem, writer, tabs + 1, visualCharacters); + } + if(visualCharacters) try writer.writeByte('\n'); + if(visualCharacters) try writeTabs(writer, tabs); + try writer.writeByte(']'); + }, + JsonType.JsonObject => |obj| { + try writer.writeByte('{'); + var iterator = obj.iterator(); + var first: bool = true; + while(true) { + var elem = iterator.next() orelse break; + if(!first) { + try writer.writeByte(','); + } + if(visualCharacters) try writer.writeByte('\n'); + if(visualCharacters) try writeTabs(writer, tabs + 1); + try writer.writeByte('\"'); + try writer.writeAll(elem.key_ptr.*); + try writer.writeByte('\"'); + if(visualCharacters) try writer.writeByte(' '); + try writer.writeByte(':'); + if(visualCharacters) try writer.writeByte(' '); + + try recurseToString(elem.value_ptr.*, writer, tabs + 1, visualCharacters); + first = false; + } + if(visualCharacters) try writer.writeByte('\n'); + if(visualCharacters) try writeTabs(writer, tabs); + try writer.writeByte('}'); + }, + } + } + pub fn toString(json: JsonElement, allocator: Allocator) ![]const u8 { + var string = std.ArrayList(u8).init(allocator); + try recurseToString(json, string.writer(), 0, true); + return string.toOwnedSlice(); + } + + /// Ignores all the visual characters(spaces, tabs and newlines) and allows adding a custom prefix(which is for example required by networking). + pub fn toStringEfficient(json: JsonElement, allocator: Allocator, prefix: []const u8) ![]const u8 { + var string = std.ArrayList(u8).init(allocator); + try string.appendSlice(prefix); + try recurseToString(json, string.writer(), 0, false); + return string.toOwnedSlice(); + } }; pub fn parseFromString(allocator: Allocator, string: []const u8) JsonElement { diff --git a/src/main.zig b/src/main.zig index 0e83c6c2..e006f0b7 100644 --- a/src/main.zig +++ b/src/main.zig @@ -6,6 +6,7 @@ const chunk = @import("chunk.zig"); const graphics = @import("graphics.zig"); const renderer = @import("renderer.zig"); const network = @import("network.zig"); +const utils = @import("utils.zig"); const Vec2f = @import("vec.zig").Vec2f; @@ -172,6 +173,8 @@ pub fn main() !void { var conn2 = try network.Connection.init(conn, "127.0.0.1"); defer conn2.deinit(); + try network.Protocols.handShake.clientSide(conn2, "quanturmdoelvloper"); + c.glEnable(c.GL_CULL_FACE); c.glCullFace(c.GL_BACK); c.glEnable(c.GL_BLEND); diff --git a/src/network.zig b/src/network.zig index e5c19cd7..f4480adf 100644 --- a/src/network.zig +++ b/src/network.zig @@ -4,6 +4,9 @@ const Allocator = std.mem.Allocator; const main = @import("main.zig"); const game = @import("game.zig"); const settings = @import("settings.zig"); +const json = @import("json.zig"); +const JsonElement = json.JsonElement; +const utils = @import("utils.zig"); //TODO: Might want to use SSL or something similar to encode the message @@ -505,10 +508,152 @@ const UnconfirmedPacket = struct { id: u32, }; +fn addProtocol(comptime comptimeList: *[256]?*const fn(*Connection, []const u8) anyerror!void, comptime prot: type) type { + if(comptimeList[prot.id] == null and prot.id != 0 and prot.id != 0xff) { + comptimeList[prot.id] = prot.receive; + } else { + @compileError("Protocol id is already used."); + } + return prot; +} + +pub const Protocols = blk: { + comptime var comptimeList = [_]?*const fn(*Connection, []const u8) anyerror!void{null} ** 256; + const Protocols_struct = struct { + list: [256]?*const fn(*Connection, []const u8) anyerror!void, + keepAlive: u8 = 0, + important: u8 = 0xff, + handShake: type = addProtocol(&comptimeList, struct { + const id: u8 = 1; + const stepStart: u8 = 0; + const stepUserData: u8 = 1; + const stepAssets: u8 = 2; + const stepServerData: u8 = 3; + const stepComplete: u8 = 255; + + fn receive(conn: *Connection, data: []const u8) !void { + if(conn.handShakeState == data[0] - 1) { + conn.handShakeState = data[0]; + switch(data[0]) { + stepUserData => { + var jsonObject = json.parseFromString(main.threadAllocator, data[1..]); + defer jsonObject.free(main.threadAllocator); + var name = jsonObject.get([]const u8, "name", "unnamed"); + var version = jsonObject.get([]const u8, "version", "unknown"); + std.log.info("User {s} joined using version {s}.", .{name, version}); + + { + // TODO: Send the world data. + var path = try std.fmt.allocPrint(main.threadAllocator, "saves/{s}/assets/", .{"Development"}); // TODO: Use world name. + defer main.threadAllocator.free(path); + var dir = try std.fs.cwd().openIterableDir(path, .{}); + defer dir.close(); + var arrayList = std.ArrayList(u8).init(main.threadAllocator); + defer arrayList.deinit(); + try arrayList.append(stepAssets); + try utils.Compression.pack(dir, arrayList.writer()); + std.log.debug("{any}", .{arrayList.items}); + try conn.sendImportant(id, arrayList.items); + try conn.flush(); + } + + // TODO: +// JsonObject jsonObject = new JsonObject(); +// ((User)conn).initPlayer(name); +// jsonObject.put("player", ((User)conn).player.save()); +// jsonObject.put("player_id", ((User)conn).player.id); +// jsonObject.put("blockPalette", Server.world.blockPalette.save()); +// JsonObject spawn = new JsonObject(); +// spawn.put("x", Server.world.spawn.x); +// spawn.put("y", Server.world.spawn.y); +// spawn.put("z", Server.world.spawn.z); +// jsonObject.put("spawn", spawn); +// byte[] string = jsonObject.toString().getBytes(StandardCharsets.UTF_8); +// byte[] outData = new byte[string.length + 1]; +// outData[0] = STEP_SERVER_DATA; +// System.arraycopy(string, 0, outData, 1, string.length); +// state.put(conn, STEP_SERVER_DATA); +// conn.sendImportant(this, outData); +// state.remove(conn); // Handshake is done. +// conn.handShakeComplete = true; +// synchronized(conn) { // Notify the waiting server thread. +// conn.notifyAll(); +// } + }, + stepAssets => { + std.log.info("Received assets.", .{}); + std.fs.cwd().deleteTree("serverAssets") catch {}; // Delete old assets. + try std.fs.cwd().makePath("serverAssets"); + try utils.Compression.unpack(try std.fs.cwd().openDir("serverAssets", .{}), data[1..]); + }, + stepServerData => { + var jsonObject = json.parseFromString(main.threadAllocator, data[1..]); + defer jsonObject.free(main.threadAllocator); + // TODO: ((ServerConnection)conn).world.finishHandshake(json); + conn.handShakeState = stepComplete; + conn.handShakeWaiting.broadcast(); // Notify the waiting client thread. + }, + stepComplete => { + + }, + else => { + std.log.err("Unknown state in HandShakeProtocol {}", .{data[0]}); + }, + } + } else { + // Ignore packages that refer to an unexpected state. Normally those might be packages that were resent by the other side. + } + _ = data; + } + + pub fn serverSide(conn: *Connection) void { + conn.handShakeState = stepStart; + } + + pub fn clientSide(conn: *Connection, name: []const u8) !void { + var jsonObject = JsonElement{.JsonObject=try main.threadAllocator.create(std.StringHashMap(JsonElement))}; + defer jsonObject.free(main.threadAllocator); + jsonObject.JsonObject.* = std.StringHashMap(JsonElement).init(main.threadAllocator); + try jsonObject.JsonObject.put(try main.threadAllocator.dupe(u8, "version"), JsonElement{.JsonString=settings.version}); + try jsonObject.JsonObject.put(try main.threadAllocator.dupe(u8, "name"), JsonElement{.JsonString=name}); + var prefix = [1]u8 {stepUserData}; + var data = try jsonObject.toStringEfficient(main.threadAllocator, &prefix); + defer main.threadAllocator.free(data); + try conn.sendImportant(id, data); + conn.handShakeState = stepUserData; + + conn.mutex.lock(); + conn.handShakeWaiting.wait(&conn.mutex); + conn.mutex.unlock(); + } + }), + }; + break :blk Protocols_struct{.list = comptimeList}; +}; +//public final class Protocols { +// public static final int[] bytesReceived = new int[256]; +// public static final int[] packetsReceived = new int[256]; +// +// public static final byte KEEP_ALIVE = 0; +// public static final byte IMPORTANT_PACKET = (byte)0xff; +// public static final HandshakeProtocol HANDSHAKE = new HandshakeProtocol(); +// public static final ChunkRequestProtocol CHUNK_REQUEST = new ChunkRequestProtocol(); +// public static final ChunkTransmissionProtocol CHUNK_TRANSMISSION = new ChunkTransmissionProtocol(); +// public static final PlayerPositionProtocol PLAYER_POSITION = new PlayerPositionProtocol(); +// public static final DisconnectProtocol DISCONNECT = new DisconnectProtocol(); +// public static final EntityPositionProtocol ENTITY_POSITION = new EntityPositionProtocol(); +// public static final BlockUpdateProtocol BLOCK_UPDATE = new BlockUpdateProtocol(); +// public static final EntityProtocol ENTITY = new EntityProtocol(); +// public static final GenericUpdateProtocol GENERIC_UPDATE = new GenericUpdateProtocol(); +// public static final ChatProtocol CHAT = new ChatProtocol(); +//} + const Protocol = struct { id: u8, const keepAlive: u8 = 0; const important: u8 = 0xff; + + }; // TODO @@ -535,7 +680,7 @@ pub const Connection = struct { messageID: u32 = 0, unconfirmedPackets: std.ArrayList(UnconfirmedPacket) = undefined, receivedPackets: [3]std.ArrayList(u32) = undefined, - lastReceivedPackets: [65536]?[]const u8 = undefined, + lastReceivedPackets: [65536]?[]const u8 = [_]?[]const u8{null} ** 65536, lastIndex: u32 = 0, lastIncompletePacket: u32 = 0, @@ -545,7 +690,8 @@ pub const Connection = struct { otherKeepAliveReceived: u32 = 0, disconnected: bool = false, - handShakeComplete: bool = false, + handShakeState: u8 = Protocols.handShake.stepStart, + handShakeWaiting: std.Thread.Condition = std.Thread.Condition{}, lastConnection: i64, mutex: std.Thread.Mutex = std.Thread.Mutex{}, @@ -618,40 +764,40 @@ pub const Connection = struct { self.streamPosition = importantHeaderSize; } - fn writeByteToStream(self: *Connection, data: u8) void { + fn writeByteToStream(self: *Connection, data: u8) !void { self.streamBuffer[self.streamPosition] = data; self.streamPosition += 1; - if(self.streamPosition == self.streamBuffer.length) { - self.flush(); + if(self.streamPosition == self.streamBuffer.len) { + try self.flush(); } } - pub fn sendImportant(self: *Connection, source: Protocol, data: []const u8) void { + pub fn sendImportant(self: *Connection, id: u8, data: []const u8) !void { self.mutex.lock(); defer self.mutex.unlock(); if(self.disconnected) return; - self.writeByteToStream(source.id); + try self.writeByteToStream(id); var processedLength = data.len; while(processedLength > 0x7f) { - self.writeByteToStream(@intCast(u8, processedLength & 0x7f) | 0x80); + try self.writeByteToStream(@intCast(u8, processedLength & 0x7f) | 0x80); processedLength >>= 7; } - self.writeByteToStream(@intCast(u8, processedLength & 0x7f)); + try self.writeByteToStream(@intCast(u8, processedLength & 0x7f)); var remaining: []const u8 = data; while(remaining.len != 0) { var copyableSize = @minimum(remaining.len, self.streamBuffer.len - self.streamPosition); - std.mem.copy(u8, self.streamBuffer, remaining[0..copyableSize]); + std.mem.copy(u8, self.streamBuffer[self.streamPosition..], remaining[0..copyableSize]); remaining = remaining[copyableSize..]; - self.streamPosition += copyableSize; + self.streamPosition += @intCast(u32, copyableSize); if(self.streamPosition == self.streamBuffer.len) { - self.flush(); + try self.flush(); } } } - pub fn sendUnimportant(self: *Connection, source: Protocol, data: []const u8) !void { + pub fn sendUnimportant(self: *Connection, id: u8, data: []const u8) !void { self.mutex.lock(); defer self.mutex.unlock(); @@ -659,7 +805,7 @@ pub const Connection = struct { std.debug.assert(data.len + 1 < maxPacketSize); var fullData = try main.threadAllocator.alloc(u8, data.len + 1); defer main.threadAllocator.free(fullData); - fullData[0] = source.id; + fullData[0] = id; std.mem.copy(u8, fullData[1..], data); self.manager.send(fullData, self.remoteAddress); } @@ -672,14 +818,17 @@ pub const Connection = struct { self.lastKeepAliveReceived = std.mem.readIntBig(u32, data[4..8]); var remaining: []const u8 = data[8..]; while(remaining.len >= 8) { - var start = std.mem.readIntBig(u32, data[0..4]); - var len = std.mem.readIntBig(u32, data[4..8]); + var start = std.mem.readIntBig(u32, remaining[0..4]); + var len = std.mem.readIntBig(u32, remaining[4..8]); + remaining = remaining[8..]; var j: usize = 0; - while(j < self.unconfirmedPackets.items.len): (j += 1) { + while(j < self.unconfirmedPackets.items.len) { var diff = self.unconfirmedPackets.items[j].id -% start; if(diff < len) { + self.allocator.free(self.unconfirmedPackets.items[j].data); _ = self.unconfirmedPackets.swapRemove(j); - j -= 1; + } else { + j += 1; } } } @@ -751,7 +900,7 @@ pub const Connection = struct { // Resend packets that didn't receive confirmation within the last 2 keep-alive signals. for(self.unconfirmedPackets.items) |*packet| { - if(self.lastKeepAliveReceived - packet.lastKeepAliveSentBefore >= 2) { + if(self.lastKeepAliveReceived -% packet.lastKeepAliveSentBefore >= 2) { packetsSent += 1; packetsResent += 1; try self.manager.send(packet.data, self.remoteAddress); @@ -783,19 +932,14 @@ pub const Connection = struct { } fn collectPackets(self: *Connection) !void { - self.mutex.lock(); - defer self.mutex.unlock(); - while(true) { var id = self.lastIncompletePacket; var receivedPacket = self.lastReceivedPackets[id & 65535] orelse return; var newIndex = self.lastIndex; var protocol = receivedPacket[newIndex]; newIndex += 1; - // TODO: - _ = protocol; -// if(Cubyz.world == null && protocol != Protocols.HANDSHAKE.id) -// return; + if(game.world == null and protocol != Protocols.handShake.id) + return; // Determine the next packet length: var len: u32 = 0; @@ -830,7 +974,7 @@ pub const Connection = struct { var remaining = data[0..]; while(remaining.len != 0) { dataAvailable = @minimum(self.lastReceivedPackets[id & 65535].?.len - newIndex, remaining.len); - std.mem.copy(u8, remaining, self.lastReceivedPackets[id & 65535].?[newIndex..dataAvailable]); + std.mem.copy(u8, remaining, self.lastReceivedPackets[id & 65535].?[newIndex..newIndex + dataAvailable]); newIndex += @intCast(u32, dataAvailable); remaining = remaining[dataAvailable..]; if(newIndex == self.lastReceivedPackets[id & 65535].?.len) { @@ -845,23 +989,27 @@ pub const Connection = struct { self.lastIndex = newIndex; // TODO: // Protocols.bytesReceived[protocol & 0xff] += data.length + 1; -// Protocols.list[protocol].receive(this, data, 0, data.length); + if(Protocols.list[protocol]) |prot| { + try prot(self, data); + } else { + std.log.warn("Received unknown protocol width id {}", .{protocol}); + } } } pub fn receive(self: *Connection, data: []const u8) !void { const protocol = data[0]; // TODO: - //if(!self.handShakeComplete and protocol != Protocols.HANDSHAKE.id and protocol != Protocol.KEEP_ALIVE and protocol != Protocol.important) { - // return; // Reject all non-handshake packets until the handshake is done. - //} + if(self.handShakeState != Protocols.handShake.stepComplete and protocol != Protocols.handShake.id and protocol != Protocols.keepAlive and protocol != Protocols.important) { + return; // Reject all non-handshake packets until the handshake is done. + } self.lastConnection = std.time.milliTimestamp(); // TODO: // Protocols.bytesReceived[protocol & 0xff] += len + 20 + 8; // Including IP header and udp header // Protocols.packetsReceived[protocol & 0xff]++; if(protocol == Protocol.important) { var id = std.mem.readIntBig(u32, data[1..5]); - if(self.handShakeComplete and id == 0) { // Got a new "first" packet from client. So the client tries to reconnect, but we still think it's connected. + if(self.handShakeState == Protocols.handShake.stepComplete and id == 0) { // Got a new "first" packet from client. So the client tries to reconnect, but we still think it's connected. // TODO: // if(this instanceof User) { // Server.disconnect((User)this); diff --git a/src/settings.zig b/src/settings.zig index 7610f9ba..2032763d 100644 --- a/src/settings.zig +++ b/src/settings.zig @@ -1,4 +1,6 @@ pub const defaultPort: u16 = 47649; -pub const connectionTimeout = 60000; \ No newline at end of file +pub const connectionTimeout = 60000; + +pub const version = "0.12.0"; \ No newline at end of file diff --git a/src/utils.zig b/src/utils.zig new file mode 100644 index 00000000..6782bff0 --- /dev/null +++ b/src/utils.zig @@ -0,0 +1,110 @@ +const std = @import("std"); + +const main = @import("main.zig"); + +pub const Compression = struct { + pub fn pack(sourceDir: std.fs.IterableDir, writer: anytype) !void { + var comp = try std.compress.deflate.compressor(main.threadAllocator, writer, .{.level = .no_compression}); + defer comp.deinit(); + var walker = try sourceDir.walk(main.threadAllocator); + defer walker.deinit(); + + while(try walker.next()) |entry| { + if(entry.kind == .File) { + var relPath = entry.path; + var len: [4]u8 = undefined; + std.mem.writeIntBig(u32, &len, @intCast(u32, relPath.len)); + _ = try comp.write(&len); + _ = try comp.write(relPath); + + var file = try sourceDir.dir.openFile(relPath, .{}); + defer file.close(); + var fileData = try file.readToEndAlloc(main.threadAllocator, std.math.maxInt(u32)); + defer main.threadAllocator.free(fileData); + + std.mem.writeIntBig(u32, &len, @intCast(u32, fileData.len)); + _ = try comp.write(&len); + _ = try comp.write(fileData); + } + } + try comp.close(); + } + + pub fn unpack(outDir: std.fs.Dir, input: []const u8) !void { + var stream = std.io.fixedBufferStream(input); + var decomp = try std.compress.deflate.decompressor(main.threadAllocator, stream.reader(), null); + defer decomp.deinit(); + var reader = decomp.reader(); + const _data = try reader.readAllAlloc(main.threadAllocator, std.math.maxInt(usize)); + defer main.threadAllocator.free(_data); + var data = _data; + while(data.len != 0) { + var len = std.mem.readIntBig(u32, data[0..4]); + data = data[4..]; + var path = data[0..len]; + data = data[len..]; + len = std.mem.readIntBig(u32, data[0..4]); + data = data[4..]; + var fileData = data[0..len]; + data = data[len..]; + + var splitter = std.mem.splitBackwards(u8, path, "/"); + _ = splitter.first(); + try outDir.makePath(splitter.rest()); + var file = try outDir.createFile(path, .{}); + defer file.close(); + try file.writeAll(fileData); + } + } +// public static void pack(String sourceDirPath, OutputStream outputstream){ +// try { +// DeflaterOutputStream compressedOut = new DeflaterOutputStream(outputstream); +// Path path = Paths.get(sourceDirPath); +// Files.walk(path) +// .filter(p -> !Files.isDirectory(p)) // potential bug +// .forEach(p -> { +// String relPath = path.relativize(p).toString(); +// try { +// byte[] strData = relPath.getBytes(StandardCharsets.UTF_8); +// byte[] len = new byte[4]; +// Bits.putInt(len, 0, strData.length); +// compressedOut.write(len); +// compressedOut.write(strData); +// byte[] file = Files.readAllBytes(p); +// Bits.putInt(len, 0, file.length); +// compressedOut.write(len); +// compressedOut.write(file); +// } catch (IOException e) { +// Logger.error(e); +// } +// }); +// compressedOut.close(); +// }catch(IOException exception){ +// Logger.error(exception); +// } +// } +// public static void unpack(String outputFolderPath, InputStream inputStream){ +// try { +// File outputFolder = new File(outputFolderPath); +// if (!outputFolder.exists()) { +// outputFolder.mkdir(); +// } +// InflaterInputStream compressedIn = new InflaterInputStream(inputStream); +// while(compressedIn.available() != 0) { +// byte[] len = compressedIn.readNBytes(4); +// byte[] pathBytes = compressedIn.readNBytes(Bits.getInt(len ,0)); +// String path = new String(pathBytes, StandardCharsets.UTF_8); +// String filePath = outputFolder.getAbsolutePath() + File.separator + path; +// len = compressedIn.readNBytes(4); +// byte[] fileBytes = compressedIn.readNBytes(Bits.getInt(len ,0)); +// new File(filePath).getParentFile().mkdirs(); +// BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath)); +// bos.write(fileBytes, 0, fileBytes.length); +// bos.close(); +// } +// compressedIn.close(); +// }catch (Exception e){ +// Logger.error(e); +// } +// } +}; \ No newline at end of file