From 838a6231c1a896491e2c2bd62b3c0fe79bde25ee Mon Sep 17 00:00:00 2001 From: Bixilon Date: Thu, 16 Jul 2020 01:31:37 +0200 Subject: [PATCH] remove packet thread (only 1 network thread now) --- .../minosoft/protocol/network/Connection.java | 1 - .../minosoft/protocol/network/Network.java | 236 +++++++----------- .../java/de/bixilon/minosoft/util/Util.java | 3 +- 3 files changed, 98 insertions(+), 142 deletions(-) diff --git a/src/main/java/de/bixilon/minosoft/protocol/network/Connection.java b/src/main/java/de/bixilon/minosoft/protocol/network/Connection.java index c46888e89..57a60520b 100644 --- a/src/main/java/de/bixilon/minosoft/protocol/network/Connection.java +++ b/src/main/java/de/bixilon/minosoft/protocol/network/Connection.java @@ -95,7 +95,6 @@ public class Connection { switch (state) { case HANDSHAKING: // connection established, starting threads and logging in - network.startPacketThread(); startHandlingThread(); ConnectionState next = ((reason == ConnectionReason.CONNECT) ? ConnectionState.LOGIN : ConnectionState.STATUS); network.sendPacket(new PacketHandshake(getHost(), getPort(), next, (next == ConnectionState.STATUS) ? -1 : getVersion().getVersionNumber())); diff --git a/src/main/java/de/bixilon/minosoft/protocol/network/Network.java b/src/main/java/de/bixilon/minosoft/protocol/network/Network.java index 2ce8a68ba..9efa829a9 100644 --- a/src/main/java/de/bixilon/minosoft/protocol/network/Network.java +++ b/src/main/java/de/bixilon/minosoft/protocol/network/Network.java @@ -38,15 +38,12 @@ import java.util.List; public class Network { final Connection connection; final List queue; - final List binQueue; - final List binQueueIn; Thread socketThread; int compressionThreshold = -1; Socket socket; OutputStream outputStream; InputStream cipherInputStream; InputStream inputStream; - Thread packetThread; boolean encryptionEnabled = false; SecretKey secretKey; boolean connected; @@ -54,8 +51,6 @@ public class Network { public Network(Connection c) { this.connection = c; this.queue = new ArrayList<>(); - this.binQueue = new ArrayList<>(); - this.binQueueIn = new ArrayList<>(); } public void connect() { @@ -86,21 +81,48 @@ public class Network { break; } - while (binQueue.size() > 0) { - // something to send it, send it - byte[] b = binQueue.get(0); - // send, flush and remove - outputStream.write(b); + while (queue.size() > 0) { + ServerboundPacket p = queue.get(0); + p.log(); + queue.remove(0); + byte[] data = p.write(connection.getVersion()).getOutBytes(); + if (compressionThreshold != -1) { + // compression is enabled + // check if there is a need to compress it and if so, do it! + OutByteBuffer outRawBuffer = new OutByteBuffer(connection.getVersion()); + if (data.length >= compressionThreshold) { + // compress it + byte[] compressed = Util.compress(data); + OutByteBuffer buffer = new OutByteBuffer(connection.getVersion()); + buffer.writeVarInt(compressed.length); + buffer.writeBytes(compressed); + outRawBuffer.writeVarInt(buffer.getBytes().size()); + outRawBuffer.writeBytes(buffer.getOutBytes()); + } else { + outRawBuffer.writeVarInt(data.length + 1); // 1 for the compressed length (0) + outRawBuffer.writeVarInt(0); + outRawBuffer.writeBytes(data); + } + data = outRawBuffer.getOutBytes(); + } else { + // append packet length + OutByteBuffer bufferWithLengthPrefix = new OutByteBuffer(connection.getVersion()); + bufferWithLengthPrefix.writeVarInt(data.length); + bufferWithLengthPrefix.writeBytes(data); + data = bufferWithLengthPrefix.getOutBytes(); + } + + outputStream.write(data); outputStream.flush(); - binQueue.remove(0); - - // check if should enable encryption - if (!encryptionEnabled && secretKey != null) { + if (p instanceof PacketEncryptionResponse) { + // enable encryption + secretKey = ((PacketEncryptionResponse) p).getSecretKey(); enableEncryption(secretKey); } } + // everything sent for now, waiting for data if (inputStream.available() > 0) { // available seems not to work in CipherInputStream @@ -118,9 +140,65 @@ public class Network { } } while ((read & 0b10000000) != 0); - byte[] raw = cipherInputStream.readNBytes(length); - binQueueIn.add(raw); - packetThread.interrupt(); + byte[] data = cipherInputStream.readNBytes(length); + + + if (compressionThreshold != -1) { + // compression is enabled + // check if there is a need to decompress it and if so, do it! + InByteBuffer rawBuffer = new InByteBuffer(data, connection.getVersion()); + int packetSize = rawBuffer.readVarInt(); + byte[] left = rawBuffer.readBytesLeft(); + if (packetSize == 0) { + // no need + data = left; + } else { + // need to decompress data + data = Util.decompress(left, connection.getVersion()).readBytesLeft(); + } + } + + InPacketBuffer inPacketBuffer = new InPacketBuffer(data, connection.getVersion()); + try { + Packets.Clientbound p = connection.getVersion().getProtocol().getPacketByCommand(connection.getConnectionState(), inPacketBuffer.getCommand()); + Class clazz = Protocol.getPacketByPacket(p); + + if (clazz == null) { + Log.warn(String.format("[IN] Received unknown packet (id=0x%x, name=%s, length=%d, dataLength=%d, version=%s, state=%s)", inPacketBuffer.getCommand(), ((p != null) ? p.name() : "UNKNOWN"), inPacketBuffer.getLength(), inPacketBuffer.getBytesLeft(), connection.getVersion().name(), connection.getConnectionState().name())); + continue; + } + try { + ClientboundPacket packet = clazz.getConstructor().newInstance(); + boolean success; + if (packet instanceof PacketChunkData) { + // this packets need to know if we are in an dimension with skylight... + success = ((PacketChunkData) (packet)).read(inPacketBuffer, connection.getPlayer().getWorld().getDimension()); + } else { + success = packet.read(inPacketBuffer); + } + if (inPacketBuffer.getBytesLeft() > 0 || !success) { + // warn not all data used + Log.warn(String.format("[IN] Could not parse packet %s (used=%d, available=%d, total=%d, success=%s)", ((p != null) ? p.name() : "null"), inPacketBuffer.getPosition(), inPacketBuffer.getBytesLeft(), inPacketBuffer.getLength(), success)); + + continue; + } + + if (packet instanceof PacketLoginSuccess) { + // login was okay, setting play status to avoid miss timing issues + connection.setConnectionState(ConnectionState.PLAY); + } else if (packet instanceof PacketLoginSetCompression) { + // instantly set compression. because handling is to slow... + compressionThreshold = ((PacketLoginSetCompression) packet).getThreshold(); + } + connection.handle(packet); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + // safety first, but will not occur + e.printStackTrace(); + } + } catch (Exception e) { + Log.protocol("Received broken packet!"); + e.printStackTrace(); + } } Util.sleep(1); @@ -138,130 +216,10 @@ public class Network { socketThread.start(); } - public void startPacketThread() { - // compressed data, makes packets to binary data - // read data - // safety first, but will not occur - packetThread = new Thread(() -> { - // compressed data, makes packets to binary data - while (connection.getConnectionState() != ConnectionState.DISCONNECTING) { - - while (queue.size() > 0) { - ServerboundPacket p = queue.get(0); - p.log(); - queue.remove(0); - byte[] data = p.write(connection.getVersion()).getOutBytes(); - if (compressionThreshold != -1) { - // compression is enabled - // check if there is a need to compress it and if so, do it! - OutByteBuffer outRawBuffer = new OutByteBuffer(connection.getVersion()); - if (data.length >= compressionThreshold) { - // compress it - byte[] compressed = Util.compress(data); - OutByteBuffer buffer = new OutByteBuffer(connection.getVersion()); - buffer.writeVarInt(compressed.length); - buffer.writeBytes(compressed); - outRawBuffer.writeVarInt(buffer.getBytes().size()); - outRawBuffer.writeBytes(buffer.getOutBytes()); - } else { - outRawBuffer.writeVarInt(data.length + 1); // 1 for the compressed length (0) - outRawBuffer.writeVarInt(0); - outRawBuffer.writeBytes(data); - } - data = outRawBuffer.getOutBytes(); - } else { - // append packet length - OutByteBuffer bufferWithLengthPrefix = new OutByteBuffer(connection.getVersion()); - bufferWithLengthPrefix.writeVarInt(data.length); - bufferWithLengthPrefix.writeBytes(data); - data = bufferWithLengthPrefix.getOutBytes(); - } - - - binQueue.add(data); - if (p instanceof PacketEncryptionResponse) { - // enable encryption - secretKey = ((PacketEncryptionResponse) p).getSecretKey(); - } - } - while (binQueueIn.size() > 0) { - - // read data - byte[] data = binQueueIn.get(0); - binQueueIn.remove(0); - if (compressionThreshold != -1) { - // compression is enabled - // check if there is a need to decompress it and if so, do it! - InByteBuffer rawBuffer = new InByteBuffer(data, connection.getVersion()); - int packetSize = rawBuffer.readVarInt(); - byte[] left = rawBuffer.readBytesLeft(); - if (packetSize == 0) { - // no need - data = left; - } else { - // need to decompress data - data = Util.decompress(left, connection.getVersion()).readBytesLeft(); - } - } - - InPacketBuffer inPacketBuffer = new InPacketBuffer(data, connection.getVersion()); - try { - Packets.Clientbound p = connection.getVersion().getProtocol().getPacketByCommand(connection.getConnectionState(), inPacketBuffer.getCommand()); - Class clazz = Protocol.getPacketByPacket(p); - - if (clazz == null) { - Log.warn(String.format("[IN] Received unknown packet (id=0x%x, name=%s, length=%d, dataLength=%d, version=%s, state=%s)", inPacketBuffer.getCommand(), ((p != null) ? p.name() : "UNKNOWN"), inPacketBuffer.getLength(), inPacketBuffer.getBytesLeft(), connection.getVersion().name(), connection.getConnectionState().name())); - continue; - } - try { - ClientboundPacket packet = clazz.getConstructor().newInstance(); - boolean success; - if (packet instanceof PacketChunkData) { - // this packets need to know if we are in an dimension with skylight... - success = ((PacketChunkData) (packet)).read(inPacketBuffer, connection.getPlayer().getWorld().getDimension()); - } else { - success = packet.read(inPacketBuffer); - } - if (inPacketBuffer.getBytesLeft() > 0 || !success) { - // warn not all data used - Log.warn(String.format("[IN] Could not parse packet %s (used=%d, available=%d, total=%d, success=%s)", ((p != null) ? p.name() : "null"), inPacketBuffer.getPosition(), inPacketBuffer.getBytesLeft(), inPacketBuffer.getLength(), success)); - - continue; - } - - if (packet instanceof PacketLoginSuccess) { - // login was okay, setting play status to avoid miss timing issues - connection.setConnectionState(ConnectionState.PLAY); - } else if (packet instanceof PacketLoginSetCompression) { - // instantly set compression. because handling is to slow... - compressionThreshold = ((PacketLoginSetCompression) packet).getThreshold(); - } - connection.handle(packet); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - // safety first, but will not occur - e.printStackTrace(); - } - } catch (Exception e) { - Log.protocol("Received broken packet!"); - e.printStackTrace(); - } - } - try { - // sleep, wait for an interrupt from other thread - //noinspection BusyWait - Thread.sleep(100); - } catch (InterruptedException ignored) { - } - - } - }); - packetThread.setName("Packet-Thread"); - packetThread.start(); - } public void sendPacket(ServerboundPacket p) { queue.add(p); - packetThread.interrupt(); + socketThread.interrupt(); } public void enableEncryption(SecretKey secretKey) { @@ -278,7 +236,7 @@ public class Network { } public void disconnect() { - packetThread.interrupt(); + socketThread.interrupt(); } } diff --git a/src/main/java/de/bixilon/minosoft/util/Util.java b/src/main/java/de/bixilon/minosoft/util/Util.java index c332c3173..f1bb5edb7 100644 --- a/src/main/java/de/bixilon/minosoft/util/Util.java +++ b/src/main/java/de/bixilon/minosoft/util/Util.java @@ -35,8 +35,7 @@ public class Util { public static void sleep(int ms) { try { Thread.sleep(ms); - } catch (InterruptedException e) { - e.printStackTrace(); + } catch (InterruptedException ignored) { } }