diff --git a/src/main/scala/li/cil/oc/server/component/InternetCard.scala b/src/main/scala/li/cil/oc/server/component/InternetCard.scala index ec8f74da4..a56d1d5e7 100644 --- a/src/main/scala/li/cil/oc/server/component/InternetCard.scala +++ b/src/main/scala/li/cil/oc/server/component/InternetCard.scala @@ -7,25 +7,26 @@ import java.io.InputStream import java.io.OutputStreamWriter import java.net._ import java.nio.ByteBuffer +import java.nio.channels.SelectionKey +import java.nio.channels.Selector import java.nio.channels.SocketChannel import java.util -import java.util.concurrent.Callable -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.ExecutionException -import java.util.concurrent.Future +import java.util.UUID +import java.util.concurrent._ import li.cil.oc.Constants +import li.cil.oc.OpenComputers +import li.cil.oc.Settings +import li.cil.oc.api.Network +import li.cil.oc.api.driver.DeviceInfo import li.cil.oc.api.driver.DeviceInfo.DeviceAttribute import li.cil.oc.api.driver.DeviceInfo.DeviceClass -import li.cil.oc.Settings import li.cil.oc.api.machine.Arguments import li.cil.oc.api.machine.Callback import li.cil.oc.api.machine.Context import li.cil.oc.api.network._ -import li.cil.oc.api.prefab.AbstractValue -import li.cil.oc.api.Network -import li.cil.oc.api.driver.DeviceInfo import li.cil.oc.api.prefab +import li.cil.oc.api.prefab.AbstractValue import li.cil.oc.util.ThreadPoolFactory import net.minecraft.server.MinecraftServer @@ -183,6 +184,41 @@ object InternetCard { def close(): Unit } + object TCPNotifier extends Thread { + private val selector = Selector.open() + private val toAccept = new ConcurrentLinkedQueue[(SocketChannel, () => Unit)] + + override def run(): Unit = { + while (true) { + try { + Stream.continually(toAccept.poll).takeWhile(_ != null).foreach({ + case (channel: SocketChannel, action: (() => Unit)) => + channel.register(selector, SelectionKey.OP_READ, action) + }) + + selector.select() + + import scala.collection.JavaConversions._ + val selectedKeys = selector.selectedKeys + selectedKeys.filter(_.isReadable).foreach(key => { + key.cancel() + key.attachment().asInstanceOf[() => Unit].apply() + }) + } catch { + case e: IOException => + OpenComputers.log.error("Error in TCP selector loop.", e) + } + } + } + + def add(e: (SocketChannel, () => Unit)) { + toAccept.offer(e) + selector.wakeup() + } + } + + TCPNotifier.start() + class TCPSocket extends AbstractValue with Closable { def this(owner: InternetCard, uri: URI, port: Int) { this() @@ -196,9 +232,25 @@ object InternetCard { private var address: Future[InetAddress] = null private var channel: SocketChannel = null private var isAddressResolved = false + private val id = UUID.randomUUID() + + private def setupSelector() { + TCPNotifier.add((channel, () => { + owner match { + case Some(internetCard) => + internetCard.node.sendToVisible("computer.signal", "internet_ready", internetCard.node.address(), id.toString) + case _ => + channel.close() + } + })) + } @Callback(doc = """function():boolean -- Ensures a socket is connected. Errors if the connection failed.""") - def finishConnect(context: Context, args: Arguments): Array[AnyRef] = this.synchronized(result(checkConnected())) + def finishConnect(context: Context, args: Arguments): Array[AnyRef] = { + val r = this.synchronized(result(checkConnected())) + setupSelector() + r + } @Callback(doc = """function([n:number]):string -- Tries to read data from the socket stream. Returns the read byte array.""") def read(context: Context, args: Arguments): Array[AnyRef] = this.synchronized { @@ -207,7 +259,10 @@ object InternetCard { val buffer = ByteBuffer.allocate(n) val read = channel.read(buffer) if (read == -1) result(Unit) - else result(buffer.array.view(0, read).toArray) + else { + setupSelector() + result(buffer.array.view(0, read).toArray) + } } else result(Array.empty[Byte]) } @@ -227,6 +282,11 @@ object InternetCard { null } + @Callback(direct = true, doc = """function():string -- Returns connection ID.""") + def id(context: Context, args: Arguments): Array[AnyRef] = this.synchronized { + result(id.toString) + } + override def dispose(context: Context): Unit = { super.dispose(context) close()