Send internet events when ready to read.

This commit is contained in:
Łukasz Magiera 2016-06-25 13:17:06 +02:00 committed by Florian Nücke
parent a9f2c6d77b
commit 422592d84e

View File

@ -7,25 +7,26 @@ import java.io.InputStream
import java.io.OutputStreamWriter
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.nio.channels.SocketChannel
import java.util
import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.UUID
import java.util.concurrent._
import li.cil.oc.Constants
import li.cil.oc.OpenComputers
import li.cil.oc.Settings
import li.cil.oc.api.Network
import li.cil.oc.api.driver.DeviceInfo
import li.cil.oc.api.driver.DeviceInfo.DeviceAttribute
import li.cil.oc.api.driver.DeviceInfo.DeviceClass
import li.cil.oc.Settings
import li.cil.oc.api.machine.Arguments
import li.cil.oc.api.machine.Callback
import li.cil.oc.api.machine.Context
import li.cil.oc.api.network._
import li.cil.oc.api.prefab.AbstractValue
import li.cil.oc.api.Network
import li.cil.oc.api.driver.DeviceInfo
import li.cil.oc.api.prefab
import li.cil.oc.api.prefab.AbstractValue
import li.cil.oc.util.ThreadPoolFactory
import net.minecraft.server.MinecraftServer
@ -183,6 +184,41 @@ object InternetCard {
def close(): Unit
}
object TCPNotifier extends Thread {
private val selector = Selector.open()
private val toAccept = new ConcurrentLinkedQueue[(SocketChannel, () => Unit)]
override def run(): Unit = {
while (true) {
try {
Stream.continually(toAccept.poll).takeWhile(_ != null).foreach({
case (channel: SocketChannel, action: (() => Unit)) =>
channel.register(selector, SelectionKey.OP_READ, action)
})
selector.select()
import scala.collection.JavaConversions._
val selectedKeys = selector.selectedKeys
selectedKeys.filter(_.isReadable).foreach(key => {
key.cancel()
key.attachment().asInstanceOf[() => Unit].apply()
})
} catch {
case e: IOException =>
OpenComputers.log.error("Error in TCP selector loop.", e)
}
}
}
def add(e: (SocketChannel, () => Unit)) {
toAccept.offer(e)
selector.wakeup()
}
}
TCPNotifier.start()
class TCPSocket extends AbstractValue with Closable {
def this(owner: InternetCard, uri: URI, port: Int) {
this()
@ -196,9 +232,25 @@ object InternetCard {
private var address: Future[InetAddress] = null
private var channel: SocketChannel = null
private var isAddressResolved = false
private val id = UUID.randomUUID()
private def setupSelector() {
TCPNotifier.add((channel, () => {
owner match {
case Some(internetCard) =>
internetCard.node.sendToVisible("computer.signal", "internet_ready", internetCard.node.address(), id.toString)
case _ =>
channel.close()
}
}))
}
@Callback(doc = """function():boolean -- Ensures a socket is connected. Errors if the connection failed.""")
def finishConnect(context: Context, args: Arguments): Array[AnyRef] = this.synchronized(result(checkConnected()))
def finishConnect(context: Context, args: Arguments): Array[AnyRef] = {
val r = this.synchronized(result(checkConnected()))
setupSelector()
r
}
@Callback(doc = """function([n:number]):string -- Tries to read data from the socket stream. Returns the read byte array.""")
def read(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
@ -207,7 +259,10 @@ object InternetCard {
val buffer = ByteBuffer.allocate(n)
val read = channel.read(buffer)
if (read == -1) result(Unit)
else result(buffer.array.view(0, read).toArray)
else {
setupSelector()
result(buffer.array.view(0, read).toArray)
}
}
else result(Array.empty[Byte])
}
@ -227,6 +282,11 @@ object InternetCard {
null
}
@Callback(direct = true, doc = """function():string -- Returns connection ID.""")
def id(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
result(id.toString)
}
override def dispose(context: Context): Unit = {
super.dispose(context)
close()