Implement asynchronous sockets

This commit is contained in:
Mitchell Kutchuk 2015-06-20 19:43:28 -07:00
parent 939a6dc79c
commit 4225124546
17 changed files with 954 additions and 205 deletions

View File

@ -1,4 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
namespace TrueCraft.API.Networking
{
@ -6,8 +8,9 @@ namespace TrueCraft.API.Networking
{
int ProtocolVersion { get; }
ConcurrentDictionary<object, IPacketSegmentProcessor> Processors { get; }
void RegisterPacketType<T>(bool clientbound = true, bool serverbound = true) where T : IPacket;
IPacket ReadPacket(IMinecraftStream stream, bool serverbound = true);
IEnumerable<IPacket> ReadPackets(object key, byte[] buffer, int offset, int length, bool serverbound = true);
void WritePacket(IMinecraftStream stream, IPacket packet);
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace TrueCraft.API.Networking
{
public interface IPacketSegmentProcessor
{
bool ProcessNextSegment(byte[] nextSegment, int offset, int len, out IPacket packet);
}
}

View File

@ -19,9 +19,14 @@ namespace TrueCraft.API.Networking
IMultiplayerServer Server { get; }
bool EnableLogging { get; set; }
bool Disconnected { get; }
bool Load();
void Save();
void QueuePacket(IPacket packet);
void Disconnect();
void SendMessage(string message);
void Log(string message, params object[] parameters);
void OpenWindow(IWindow window);

View File

@ -32,6 +32,8 @@ namespace TrueCraft.API.Server
bool BlockUpdatesEnabled { get; set; }
bool EnableClientLogging { get; set; }
bool ShuttingDown { get; }
void Start(IPEndPoint endPoint);
void Stop();
void RegisterPacketHandler(byte packetId, PacketHandler handler);
@ -41,6 +43,8 @@ namespace TrueCraft.API.Server
IEntityManager GetEntityManagerForWorld(IWorld world);
void SendMessage(string message, params object[] parameters);
void DisconnectClient(IRemoteClient client);
bool PlayerIsWhitelisted(string client);
bool PlayerIsBlacklisted(string client);
bool PlayerIsOp(string client);

View File

@ -39,6 +39,7 @@
<Compile Include="Coordinates3D.cs" />
<Compile Include="ChatFormat.cs" />
<Compile Include="IAccessConfiguration.cs" />
<Compile Include="Networking\IPacketSegmentProcessor.cs" />
<Compile Include="PlantSpecies.cs" />
<Compile Include="OreTypes.cs" />
<Compile Include="ToolMaterial.cs" />

View File

@ -12,13 +12,14 @@ using TrueCraft.Core.Logic;
using TrueCraft.API.Entities;
using TrueCraft.API;
using System.ComponentModel;
using System.IO;
using TrueCraft.Core;
namespace TrueCraft.Client
{
public delegate void PacketHandler(IPacket packet, MultiplayerClient client);
public class MultiplayerClient : IAABBEntity, INotifyPropertyChanged // TODO: Make IMultiplayerClient and so on
public class MultiplayerClient : IAABBEntity, INotifyPropertyChanged, IDisposable // TODO: Make IMultiplayerClient and so on
{
public event EventHandler<ChatMessageEventArgs> ChatMessage;
public event EventHandler<ChunkEventArgs> ChunkModified;
@ -26,26 +27,40 @@ namespace TrueCraft.Client
public event EventHandler<ChunkEventArgs> ChunkUnloaded;
public event PropertyChangedEventHandler PropertyChanged;
private long _connected;
public TrueCraftUser User { get; set; }
public ReadOnlyWorld World { get; private set; }
public PhysicsEngine Physics { get; set; }
public bool LoggedIn { get; internal set; }
public bool Connected
{
get
{
return Interlocked.Read(ref _connected) == 1;
}
}
private TcpClient Client { get; set; }
private IMinecraftStream Stream { get; set; }
private PacketReader PacketReader { get; set; }
private BlockingCollection<IPacket> PacketQueue { get; set; }
private Thread NetworkWorker { get; set; }
private readonly PacketHandler[] PacketHandlers;
private SemaphoreSlim _sem = new SemaphoreSlim(1, 1);
private readonly CancellationTokenSource _cancel;
private SocketAsyncEventArgsPool SocketPool { get; set; }
public MultiplayerClient(TrueCraftUser user)
{
User = user;
Client = new TcpClient();
PacketQueue = new BlockingCollection<IPacket>(new ConcurrentQueue<IPacket>());
PacketReader = new PacketReader();
PacketReader.RegisterCorePackets();
NetworkWorker = new Thread(new ThreadStart(DoNetwork));
//NetworkWorker = new Thread(new ThreadStart(DoNetwork));
PacketHandlers = new PacketHandler[0x100];
Handlers.PacketHandlers.RegisterHandlers(this);
World = new ReadOnlyWorld();
@ -53,6 +68,9 @@ namespace TrueCraft.Client
repo.DiscoverBlockProviders();
World.World.BlockRepository = repo;
Physics = new PhysicsEngine(World, repo);
SocketPool = new SocketAsyncEventArgsPool(100, 200, 65536);
_connected = 0;
_cancel = new CancellationTokenSource();
}
public void RegisterPacketHandler(byte packetId, PacketHandler handler)
@ -62,58 +80,122 @@ namespace TrueCraft.Client
public void Connect(IPEndPoint endPoint)
{
Client.BeginConnect(endPoint.Address, endPoint.Port, ConnectionComplete, null);
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += Connection_Completed;
args.RemoteEndPoint = endPoint;
if (!Client.Client.ConnectAsync(args))
Connection_Completed(this, args);
}
private void Connection_Completed(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
Interlocked.CompareExchange(ref _connected, 1, 0);
Physics.AddEntity(this);
StartReceive();
QueuePacket(new HandshakePacket(User.Username));
}
else
{
throw new Exception("Could not connect to server!");
}
}
public void Disconnect()
{
NetworkWorker.Abort();
new DisconnectPacket("Disconnecting").WritePacket(Stream);
Stream.BaseStream.Flush();
if (!Connected)
return;
Interlocked.CompareExchange(ref _connected, 0, 1);
QueuePacket(new DisconnectPacket("Disconnecting"));
Client.Client.Shutdown(SocketShutdown.Send);
Client.Close();
_cancel.Cancel();
}
public void QueuePacket(IPacket packet)
{
PacketQueue.Add(packet);
}
if (!Connected || (Client != null && !Client.Connected))
return;
private void ConnectionComplete(IAsyncResult result)
{
Client.EndConnect(result);
Stream = new MinecraftStream(new BufferedStream(Client.GetStream()));
NetworkWorker.Start();
Physics.AddEntity(this);
QueuePacket(new HandshakePacket(User.Username));
}
private void DoNetwork()
{
bool idle = true;
while (true)
using (MemoryStream writeStream = new MemoryStream())
{
IPacket packet;
DateTime limit = DateTime.Now.AddMilliseconds(500);
while (Client.Available != 0 && DateTime.Now < limit)
using (MinecraftStream ms = new MinecraftStream(writeStream))
{
ms.WriteUInt8(packet.ID);
packet.WritePacket(ms);
}
byte[] buffer = writeStream.ToArray();
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += Operation_Completed;
args.SetBuffer(buffer, 0, buffer.Length);
if (Client != null && !Client.Client.SendAsync(args))
Operation_Completed(this, args);
}
}
private void StartReceive()
{
SocketAsyncEventArgs args = SocketPool.Get();
args.Completed += Operation_Completed;
if (!Client.Client.ReceiveAsync(args))
Operation_Completed(this, args);
}
private void Operation_Completed(object sender, SocketAsyncEventArgs e)
{
e.Completed -= Operation_Completed;
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessNetwork(e);
SocketPool.Add(e);
break;
case SocketAsyncOperation.Send:
e.SetBuffer(null, 0, 0);
break;
}
}
private void ProcessNetwork(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success && e.BytesTransferred > 0)
{
SocketAsyncEventArgs newArgs = SocketPool.Get();
newArgs.Completed += Operation_Completed;
if (Client != null && !Client.Client.ReceiveAsync(newArgs))
Operation_Completed(this, newArgs);
_sem.Wait(_cancel.Token);
var packets = PacketReader.ReadPackets(this, e.Buffer, e.Offset, e.BytesTransferred, false);
foreach (IPacket packet in packets)
{
idle = false;
packet = PacketReader.ReadPacket(Stream, false);
if (PacketHandlers[packet.ID] != null)
PacketHandlers[packet.ID](packet, this);
}
limit = DateTime.Now.AddMilliseconds(500);
while (PacketQueue.Any() && DateTime.Now < limit)
{
idle = false;
if (PacketQueue.TryTake(out packet, 100))
{
PacketReader.WritePacket(Stream, packet);
Stream.BaseStream.Flush();
}
}
if (idle)
Thread.Sleep(100);
if (_sem != null)
_sem.Release();
}
else
{
Disconnect();
}
}
@ -224,5 +306,29 @@ namespace TrueCraft.Client
}
#endregion
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
Disconnect();
_sem.Dispose();
}
_sem = null;
}
~MultiplayerClient()
{
Dispose(false);
}
}
}

View File

@ -0,0 +1,135 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace TrueCraft.Core.Collections
{
public class ByteArraySegment : ICollection<byte>
{
private readonly byte[] _array;
private readonly int _start;
private readonly int _count;
public ByteArraySegment(byte[] array, int start, int count)
{
_array = array;
_start = start;
_count = count;
}
public void Add(byte item)
{
throw new NotImplementedException();
}
public void Clear()
{
throw new NotImplementedException();
}
public bool Contains(byte item)
{
return _array.Contains(item);
}
public void CopyTo(byte[] target, int index)
{
Buffer.BlockCopy(_array, _start, target, index, _count);
}
public bool Remove(byte item)
{
throw new NotImplementedException();
}
public int Count
{
get
{
return _count;
}
}
public bool IsReadOnly
{
get
{
return true;
}
}
public byte this[int index]
{
get
{
return _array[index];
}
set
{
if (index > _array.Length)
throw new ArgumentOutOfRangeException("value");
_array[index] = value;
}
}
public IEnumerator<byte> GetEnumerator()
{
return new ByteArraySegmentEnumerator(this);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
class ByteArraySegmentEnumerator : IEnumerator<byte>
{
private byte _current;
private int _pos;
private readonly ByteArraySegment _segment;
public ByteArraySegmentEnumerator(ByteArraySegment segment)
{
_segment = segment;
_pos = segment._start;
}
public bool MoveNext()
{
if (_pos >= _segment.Count)
return false;
_current = _segment._array[++_pos];
return true;
}
public void Reset()
{
_pos = _segment._start;
}
public byte Current
{
get
{
return _current;
}
}
public void Dispose()
{
}
object IEnumerator.Current
{
get { return Current; }
}
}
}
}

View File

@ -0,0 +1,67 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
namespace TrueCraft.Core.Networking
{
public class BufferManager
{
private readonly object _bufferLocker = new object();
private readonly List<byte[]> _buffers;
private readonly int _bufferSize;
private readonly Stack<int> _availableBuffers;
public BufferManager(int bufferSize)
{
_bufferSize = bufferSize;
_buffers = new List<byte[]>();
_availableBuffers = new Stack<int>();
}
public void SetBuffer(SocketAsyncEventArgs args)
{
if (_availableBuffers.Count > 0)
{
int index = _availableBuffers.Pop();
byte[] buffer;
lock (_bufferLocker)
{
buffer = _buffers[index];
}
args.SetBuffer(buffer, 0, buffer.Length);
}
else
{
byte[] buffer = new byte[_bufferSize];
lock (_bufferLocker)
{
_buffers.Add(buffer);
}
args.SetBuffer(buffer, 0, buffer.Length);
}
}
public void ClearBuffer(SocketAsyncEventArgs args)
{
int index;
lock (_bufferLocker)
{
index = _buffers.IndexOf(args.Buffer);
}
if (index >= 0)
_availableBuffers.Push(index);
args.SetBuffer(null, 0, 0);
}
}
}

View File

@ -0,0 +1,118 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
namespace TrueCraft.Core.Networking
{
public class ByteListMemoryStream : Stream
{
private long _position;
private readonly List<byte> _buffer;
public ByteListMemoryStream() : this(new List<byte>())
{
}
public ByteListMemoryStream(List<byte> buffer, int offset = 0)
{
_position = offset;
_buffer = buffer;
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
if (origin == SeekOrigin.Begin)
_position = offset;
else if (origin == SeekOrigin.Current)
_position += offset;
else //End
_position = (_buffer.Count - 1) - offset;
return _position;
}
public override void SetLength(long value)
{
_buffer.RemoveRange((int)value, _buffer.Count - (int)value);
}
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer.Length < offset)
throw new ArgumentOutOfRangeException("offset");
if (buffer.Length < count)
throw new ArgumentOutOfRangeException("count");
byte[] buf = _buffer.Skip((int)_position).Take(count).ToArray();
Buffer.BlockCopy(buf, 0, buffer, offset, buf.Length);
_position += Math.Min(count, buf.Length);
return Math.Min(count, buf.Length);
}
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer.Length < offset)
throw new ArgumentOutOfRangeException("offset");
if (buffer.Length < count)
throw new ArgumentOutOfRangeException("count");
_buffer.AddRange(buffer.Skip(offset).Take(count));
_position += count;
}
public override bool CanRead
{
get
{
return true;
}
}
public override bool CanSeek
{
get
{
return true;
}
}
public override bool CanWrite
{
get
{
return true;
}
}
public override long Length
{
get
{
return _buffer.Count;
}
}
public override long Position
{
get
{
return _position;
}
set
{
_position = value;
}
}
}
}

View File

@ -1,4 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq.Expressions;
using TrueCraft.API.Networking;
using TrueCraft.Core.Networking.Packets;
@ -10,8 +12,17 @@ namespace TrueCraft.Core.Networking
public static readonly int Version = 14;
public int ProtocolVersion { get { return Version; } }
private Func<IPacket>[] ClientboundPackets = new Func<IPacket>[0x100];
private Func<IPacket>[] ServerboundPackets = new Func<IPacket>[0x100];
internal Func<IPacket>[] ClientboundPackets = new Func<IPacket>[0x100];
internal Func<IPacket>[] ServerboundPackets = new Func<IPacket>[0x100];
public ConcurrentDictionary<object, IPacketSegmentProcessor> Processors { get; private set; }
private static readonly byte[] EmptyBuffer = new byte[0];
public PacketReader()
{
Processors = new ConcurrentDictionary<object, IPacketSegmentProcessor>();
}
/// <summary>
/// Registers TrueCraft.Core implementations of all packets used by vanilla Minecraft.
@ -99,19 +110,33 @@ namespace TrueCraft.Core.Networking
ServerboundPackets[packet.ID] = func;
}
public IPacket ReadPacket(IMinecraftStream stream, bool serverbound = true)
public IEnumerable<IPacket> ReadPackets(object key, byte[] buffer, int offset, int length, bool serverbound = true)
{
var id = stream.ReadUInt8();
Func<IPacket> createPacket;
if (serverbound)
createPacket = ServerboundPackets[id];
else
createPacket = ClientboundPackets[id];
if (createPacket == null)
throw new NotSupportedException("Unable to read packet type 0x" + id.ToString("X2"));
var instance = createPacket();
instance.ReadPacket(stream);
return instance;
if (!Processors.ContainsKey(key))
Processors[key] = new PacketSegmentProcessor(this, serverbound);
IPacketSegmentProcessor processor = Processors[key];
IPacket packet;
processor.ProcessNextSegment(buffer, offset, length, out packet);
if (packet == null)
yield break;
while (true)
{
yield return packet;
if (!processor.ProcessNextSegment(EmptyBuffer, 0, 0, out packet))
{
if (packet != null)
{
yield return packet;
}
yield break;
}
}
}
public void WritePacket(IMinecraftStream stream, IPacket packet)

View File

@ -0,0 +1,82 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using TrueCraft.API.Networking;
using TrueCraft.Core.Collections;
namespace TrueCraft.Core.Networking
{
public class PacketSegmentProcessor : IPacketSegmentProcessor
{
public List<byte> PacketBuffer { get; private set; }
public PacketReader PacketReader { get; protected set; }
public bool ServerBound { get; private set; }
public IPacket CurrentPacket { get; protected set; }
public PacketSegmentProcessor(PacketReader packetReader, bool serverBound)
{
PacketBuffer = new List<byte>();
PacketReader = packetReader;
ServerBound = serverBound;
}
public bool ProcessNextSegment(byte[] nextSegment, int offset, int len, out IPacket packet)
{
packet = null;
CurrentPacket = null;
if (nextSegment.Length > 0)
{
PacketBuffer.AddRange(new ByteArraySegment(nextSegment, offset, len));
}
if (PacketBuffer.Count == 0)
return false;
if (CurrentPacket == null)
{
byte packetId = PacketBuffer[0];
Func<IPacket> createPacket;
if (ServerBound)
createPacket = PacketReader.ServerboundPackets[packetId];
else
createPacket = PacketReader.ClientboundPackets[packetId];
if (createPacket == null)
throw new NotSupportedException("Unable to read packet type 0x" + packetId.ToString("X2"));
CurrentPacket = createPacket();
}
using (ByteListMemoryStream listStream = new ByteListMemoryStream(PacketBuffer, 1))
{
using (MinecraftStream ms = new MinecraftStream(listStream))
{
try
{
CurrentPacket.ReadPacket(ms);
}
catch (EndOfStreamException)
{
return false;
}
}
PacketBuffer.RemoveRange(0, (int)listStream.Position);
}
packet = CurrentPacket;
CurrentPacket = null;
return PacketBuffer.Count > 0;
}
}
}

View File

@ -0,0 +1,109 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
namespace TrueCraft.Core.Networking
{
public class SocketAsyncEventArgsPool : IDisposable
{
private readonly BlockingCollection<SocketAsyncEventArgs> _argsPool;
private readonly int _maxPoolSize;
private BufferManager _bufferManager;
public SocketAsyncEventArgsPool(int poolSize, int maxSize, int bufferSize)
{
_maxPoolSize = maxSize;
_argsPool = new BlockingCollection<SocketAsyncEventArgs>(new ConcurrentQueue<SocketAsyncEventArgs>());
_bufferManager = new BufferManager(bufferSize);
Init(poolSize);
}
private void Init(int size)
{
for (int i = 0; i < size; i++)
{
_argsPool.Add(CreateEventArgs());
}
}
public SocketAsyncEventArgs Get()
{
SocketAsyncEventArgs args;
if (!_argsPool.TryTake(out args))
{
args = CreateEventArgs();
}
if (_argsPool.Count > _maxPoolSize)
{
Trim(_argsPool.Count - _maxPoolSize);
}
return args;
}
public void Add(SocketAsyncEventArgs args)
{
if (!_argsPool.IsAddingCompleted)
_argsPool.Add(args);
}
protected SocketAsyncEventArgs CreateEventArgs()
{
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
_bufferManager.SetBuffer(args);
return args;
}
public void Trim(int count)
{
for (int i = 0; i < count; i++)
{
SocketAsyncEventArgs args;
if (_argsPool.TryTake(out args))
{
_bufferManager.ClearBuffer(args);
args.Dispose();
}
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_argsPool.CompleteAdding();
while (_argsPool.Count > 0)
{
SocketAsyncEventArgs arg = _argsPool.Take();
_bufferManager.ClearBuffer(arg);
arg.Dispose();
}
}
_bufferManager = null;
}
~SocketAsyncEventArgsPool()
{
Dispose(false);
}
}
}

View File

@ -39,6 +39,7 @@
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Collections\ByteArraySegment.cs" />
<Compile Include="Logging\LogHelpers.cs" />
<Compile Include="Entities\Entity.cs" />
<Compile Include="Entities\EntityEventArgs.cs" />
@ -113,6 +114,10 @@
<Compile Include="Logic\Items\SwordItem.cs" />
<Compile Include="Logic\Items\ToolItem.cs" />
<Compile Include="Logic\Items\WheatItem.cs" />
<Compile Include="Networking\BufferManager.cs" />
<Compile Include="Networking\ByteListMemoryStream.cs" />
<Compile Include="Networking\PacketSegmentProcessor.cs" />
<Compile Include="Networking\SocketAsyncEventArgsPool.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Networking\MinecraftStream.cs" />
<Compile Include="Networking\PacketReader.cs" />

View File

@ -45,7 +45,7 @@ namespace TrueCraft.Core.World
}
set
{
BaseTime = DateTime.Now.AddSeconds(-value/20);
BaseTime = DateTime.Now.AddSeconds(-value / 20);
}
}
@ -79,8 +79,10 @@ namespace TrueCraft.Core.World
{
if (!Directory.Exists(baseDirectory))
throw new DirectoryNotFoundException();
var world = new World(Path.GetFileName(baseDirectory));
world.BaseDirectory = baseDirectory;
if (File.Exists(Path.Combine(baseDirectory, "manifest.nbt")))
{
var file = new NbtFile(Path.Combine(baseDirectory, "manifest.nbt"));
@ -93,8 +95,11 @@ namespace TrueCraft.Core.World
if (file.RootTag.Contains("Name"))
world.Name = file.RootTag["Name"].StringValue;
world.ChunkProvider = provider;
return world;
}
return world;
throw new FileNotFoundException();
}
/// <summary>
@ -207,7 +212,7 @@ namespace TrueCraft.Core.World
var adjustedCoordinates = FindBlockPosition(coordinates, out chunk);
var old = GetBlockDataFromChunk(adjustedCoordinates, chunk, coordinates);
chunk.SetBlockID(adjustedCoordinates, descriptor.ID);
chunk.SetMetadata(adjustedCoordinates,descriptor.Metadata);
chunk.SetMetadata(adjustedCoordinates, descriptor.Metadata);
if (BlockChanged != null)
BlockChanged(this, new BlockChangeEventArgs(coordinates, old, GetBlockDataFromChunk(adjustedCoordinates, chunk, coordinates)));
}

View File

@ -18,7 +18,7 @@ using TrueCraft.Core.Logic;
namespace TrueCraft
{
public class MultiplayerServer : IMultiplayerServer
public class MultiplayerServer : IMultiplayerServer, IDisposable
{
public event EventHandler<ChatMessageEventArgs> ChatMessageReceived;
public event EventHandler<PlayerJoinedQuitEventArgs> PlayerJoined;
@ -38,6 +38,7 @@ namespace TrueCraft
public IPEndPoint EndPoint { get; private set; }
private bool _BlockUpdatesEnabled = true;
private struct BlockUpdate
{
public Coordinates3D Coordinates;
@ -61,19 +62,18 @@ namespace TrueCraft
}
private Timer EnvironmentWorker;
private Thread NetworkWorker;
private TcpListener Listener;
private readonly PacketHandler[] PacketHandlers;
private IList<ILogProvider> LogProviders;
internal object ClientLock = new object();
private bool ShuttingDown = false;
public bool ShuttingDown { get; private set; }
public MultiplayerServer()
{
var reader = new PacketReader();
PacketReader = reader;
Clients = new List<IRemoteClient>();
NetworkWorker = new Thread(new ThreadStart(DoNetwork));
EnvironmentWorker = new Timer(DoEnvironment);
PacketHandlers = new PacketHandler[0x100];
Worlds = new List<IWorld>();
@ -109,9 +109,14 @@ namespace TrueCraft
Listener = new TcpListener(endPoint);
Listener.Start();
EndPoint = (IPEndPoint)Listener.LocalEndpoint;
Listener.BeginAcceptTcpClient(AcceptClient, null);
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += AcceptClient;
if (!Listener.Server.AcceptAsync(args))
AcceptClient(this, args);
Log(LogCategory.Notice, "Running TrueCraft server on {0}", EndPoint);
NetworkWorker.Start();
EnvironmentWorker.Change(100, 1000 / 20);
}
@ -229,18 +234,18 @@ namespace TrueCraft
PlayerQuit(this, e);
}
private void LogPacket(IPacket packet, bool clientToServer)
{
for (int i = 0, LogProvidersCount = LogProviders.Count; i < LogProvidersCount; i++)
{
var provider = LogProviders[i];
packet.Log(provider, clientToServer);
}
}
private void DisconnectClient(IRemoteClient _client)
public void DisconnectClient(IRemoteClient _client)
{
var client = (RemoteClient)_client;
lock (ClientLock)
{
Clients.Remove(client);
}
if (client.Disconnected)
return;
if (client.LoggedIn)
{
SendMessage(ChatColor.Yellow + "{0} has left the server.", client.Username);
@ -248,24 +253,32 @@ namespace TrueCraft
GetEntityManagerForWorld(client.World).FlushDespawns();
}
client.Save();
client.Disconnected = true;
client.Disconnect();
OnPlayerQuit(new PlayerJoinedQuitEventArgs(client));
client.Dispose();
}
private void AcceptClient(IAsyncResult result)
private void AcceptClient(object sender, SocketAsyncEventArgs args)
{
try
{
var tcpClient = Listener.EndAcceptTcpClient(result);
var client = new RemoteClient(this, tcpClient.GetStream());
var client = new RemoteClient(this, PacketReader, PacketHandlers, args.AcceptSocket);
lock (ClientLock)
Clients.Add(client);
Listener.BeginAcceptTcpClient(AcceptClient, null);
}
catch
{
// Who cares
}
finally
{
args.AcceptSocket = null;
if (!ShuttingDown && !Listener.Server.AcceptAsync(args))
AcceptClient(this, args);
}
}
private void DoEnvironment(object discarded)
@ -279,116 +292,6 @@ namespace TrueCraft
}
}
private void DoNetwork()
{
while (true)
{
if (ShuttingDown)
return;
bool idle = true;
for (int i = 0; i < Clients.Count && i >= 0; i++)
{
RemoteClient client;
lock (ClientLock)
client = Clients[i] as RemoteClient;
if (client == null)
continue;
while (client.PacketQueue.Count != 0)
{
idle = false;
try
{
IPacket packet;
if (client.PacketQueue.TryTake(out packet))
{
LogPacket(packet, false);
PacketReader.WritePacket(client.MinecraftStream, packet);
client.MinecraftStream.BaseStream.Flush();
if (packet is DisconnectPacket)
{
DisconnectClient(client);
break;
}
}
}
catch (SocketException e)
{
Log(LogCategory.Debug, "Disconnecting client due to exception in network worker");
Log(LogCategory.Debug, e.ToString());
PacketReader.WritePacket(client.MinecraftStream, new DisconnectPacket("An exception has occured on the server."));
client.MinecraftStream.BaseStream.Flush();
DisconnectClient(client);
break;
}
catch (Exception e)
{
Log(LogCategory.Debug, "Disconnecting client due to exception in network worker");
Log(LogCategory.Debug, e.ToString());
DisconnectClient(client);
break;
}
}
if (client.Disconnected)
{
lock (ClientLock)
Clients.RemoveAt(i);
break;
}
const long maxTicks = 100000 * 200; // 200ms
var start = DateTime.Now;
while (client.DataAvailable && (DateTime.Now.Ticks - start.Ticks) < maxTicks)
{
idle = false;
try
{
var packet = PacketReader.ReadPacket(client.MinecraftStream);
client.LastSuccessfulPacket = packet;
LogPacket(packet, true);
if (PacketHandlers[packet.ID] != null)
PacketHandlers[packet.ID](packet, client, this);
else
client.Log("Unhandled packet {0}", packet.GetType().Name);
}
catch (PlayerDisconnectException)
{
DisconnectClient(client);
break;
}
catch (SocketException e)
{
Log(LogCategory.Debug, "Disconnecting client due to exception in network worker");
Log(LogCategory.Debug, e.ToString());
DisconnectClient(client);
break;
}
catch (Exception e)
{
Log(LogCategory.Debug, "Disconnecting client due to exception in network worker");
Log(LogCategory.Debug, e.ToString());
try
{
PacketReader.WritePacket(client.MinecraftStream, new DisconnectPacket("An exception has occured on the server."));
client.MinecraftStream.BaseStream.Flush();
}
catch { /* Silently ignore, by now it's too late */ }
DisconnectClient(client);
break;
}
}
if (idle)
Thread.Sleep(100);
if (client.Disconnected)
{
lock (ClientLock)
Clients.RemoveAt(i);
break;
}
}
}
}
public bool PlayerIsWhitelisted(string client)
{
return AccessConfiguration.Whitelist.Contains(client, StringComparer.CurrentCultureIgnoreCase);
@ -403,5 +306,25 @@ namespace TrueCraft
{
return AccessConfiguration.Oplist.Contains(client, StringComparer.CurrentCultureIgnoreCase);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
Stop();
}
}
~MultiplayerServer()
{
Dispose(false);
}
}
}

View File

@ -27,9 +27,9 @@ namespace TrueCraft
Server = new MultiplayerServer();
Server.AddLogProvider(new ConsoleLogProvider(LogCategory.Notice | LogCategory.Warning | LogCategory.Error | LogCategory.Debug));
#if DEBUG
#if DEBUG
Server.AddLogProvider(new FileLogProvider(new StreamWriter("packets.log", false), LogCategory.Packets));
#endif
#endif
ServerConfiguration = Configuration.LoadConfiguration<ServerConfiguration>("config.yaml");

View File

@ -23,13 +23,10 @@ using TrueCraft.API.Logic;
namespace TrueCraft
{
public class RemoteClient : IRemoteClient
public class RemoteClient : IRemoteClient, IDisposable
{
public RemoteClient(IMultiplayerServer server, NetworkStream stream)
public RemoteClient(IMultiplayerServer server, IPacketReader packetReader, PacketHandler[] packetHandlers, Socket connection)
{
NetworkStream = stream;
MinecraftStream = new MinecraftStream(new TrueCraft.Core.Networking.BufferedStream(NetworkStream));
PacketQueue = new BlockingCollection<IPacket>(new ConcurrentQueue<IPacket>());
LoadedChunks = new List<Coordinates2D>();
Server = server;
Inventory = new InventoryWindow(server.CraftingRepository);
@ -41,18 +38,25 @@ namespace TrueCraft
Disconnected = false;
EnableLogging = server.EnableClientLogging;
NextWindowID = 1;
Connection = connection;
SocketPool = new SocketAsyncEventArgsPool(100, 200, 65536);
PacketReader = packetReader;
PacketHandlers = packetHandlers;
_cancel = new CancellationTokenSource();
StartReceive();
}
/// <summary>
/// A list of entities that this client is aware of.
/// </summary>
internal List<IEntity> KnownEntities { get; set; }
internal bool Disconnected { get; set; }
internal sbyte NextWindowID { get; set; }
public NetworkStream NetworkStream { get; set; }
//public NetworkStream NetworkStream { get; set; }
public bool Disconnected { get; private set; }
public IMinecraftStream MinecraftStream { get; internal set; }
public BlockingCollection<IPacket> PacketQueue { get; private set; }
public string Username { get; internal set; }
public bool LoggedIn { get; internal set; }
public IMultiplayerServer Server { get; set; }
@ -64,7 +68,21 @@ namespace TrueCraft
public bool EnableLogging { get; set; }
public IPacket LastSuccessfulPacket { get; set; }
public Socket Connection { get; private set; }
private SemaphoreSlim _sem = new SemaphoreSlim(1, 1);
private SocketAsyncEventArgsPool SocketPool { get; set; }
public IPacketReader PacketReader { get; private set; }
private PacketHandler[] PacketHandlers { get; set; }
private IEntity _Entity;
private readonly CancellationTokenSource _cancel;
public IEntity Entity
{
get
@ -116,7 +134,7 @@ namespace TrueCraft
{
get
{
return NetworkStream.DataAvailable;
return true;
}
}
@ -195,7 +213,110 @@ namespace TrueCraft
public void QueuePacket(IPacket packet)
{
PacketQueue.Add(packet);
if (Disconnected || (Connection != null && !Connection.Connected))
return;
using (MemoryStream writeStream = new MemoryStream())
{
using (MinecraftStream ms = new MinecraftStream(writeStream))
{
writeStream.WriteByte(packet.ID);
packet.WritePacket(ms);
}
byte[] buffer = writeStream.ToArray();
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.UserToken = packet;
args.Completed += Operation_Completed;
args.SetBuffer(buffer, 0, buffer.Length);
if (Connection != null)
{
if (!Connection.SendAsync(args))
Operation_Completed(this, args);
}
}
}
private void StartReceive()
{
SocketAsyncEventArgs args = SocketPool.Get();
args.Completed += Operation_Completed;
if (!Connection.ReceiveAsync(args))
Operation_Completed(this, args);
}
private void Operation_Completed(object sender, SocketAsyncEventArgs e)
{
e.Completed -= Operation_Completed;
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessNetwork(e);
SocketPool.Add(e);
break;
case SocketAsyncOperation.Send:
IPacket packet = e.UserToken as IPacket;
if (packet is DisconnectPacket)
Server.DisconnectClient(this);
e.SetBuffer(null, 0, 0);
break;
}
}
private void ProcessNetwork(SocketAsyncEventArgs e)
{
if (Server.ShuttingDown)
return;
if (e.SocketError == SocketError.Success && e.BytesTransferred > 0)
{
SocketAsyncEventArgs newArgs = SocketPool.Get();
newArgs.Completed += Operation_Completed;
if (!Connection.ReceiveAsync(newArgs))
Operation_Completed(this, newArgs);
_sem.Wait(_cancel.Token);
var packets = PacketReader.ReadPackets(this, e.Buffer, e.Offset, e.BytesTransferred);
foreach (IPacket packet in packets)
{
LastSuccessfulPacket = packet;
if (PacketHandlers[packet.ID] != null)
PacketHandlers[packet.ID](packet, this, Server);
else
Log("Unhandled packet {0}", packet.GetType().Name);
}
if (_sem != null)
_sem.Release();
}
else
{
Server.DisconnectClient(this);
}
}
public void Disconnect()
{
if (!Disconnected)
return;
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
Connection.DisconnectAsync(args);
Disconnected = true;
_cancel.Cancel();
}
public void SendMessage(string message)
@ -335,5 +456,33 @@ namespace TrueCraft
var result = ZlibStream.CompressBuffer(data);
return new ChunkDataPacket(X * Chunk.Width, 0, Z * Chunk.Depth, Chunk.Width, Chunk.Height, Chunk.Depth, result);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
IPacketSegmentProcessor processor;
while (!PacketReader.Processors.TryRemove(this, out processor))
Thread.Sleep(1);
Disconnect();
_sem.Dispose();
}
_sem = null;
}
~RemoteClient()
{
Dispose(false);
}
}
}