openos 1.6.5 update fixes and optimizations, one api removed

process: process.running has been deprecated for over 2 years and is now obsolete, please use process.info
process: added process.internal.run designed to run a coroutine to completion
01_process: cleaner coroutine_handler intercept code
event: removed unnecessary local function and added an optional handlers param to register to simplify thread code
io: update popen to use new /lib/pipe.lua library
pipe: new /lib/pipe replace /lib/pipes
> superior coroutine stack code, can reparent stacks
> handles all pipe work, sh calls /lib/pipe now for pipes
sh: now uses /lib/pipe to build pipe chains, calls shell.run to run a pipe chain.
thread: greatly improved reliability of embedded and detached threads
boot: removed dead code
This commit is contained in:
payonel 2017-06-18 00:02:08 -07:00
parent a61204aee8
commit 5fdee17fcd
11 changed files with 363 additions and 629 deletions

View File

@ -3,25 +3,11 @@ local process = require("process")
--Initialize coroutine library--
local _coroutine = coroutine -- real coroutine backend
_G.coroutine = {}
package.loaded.coroutine = _G.coroutine
_G.coroutine = setmetatable({}, {__index = function(_, key)
return assert(process.info(_coroutine.running()), "thread has no proc").data.coroutine_handler[key]
end})
for key,value in pairs(_coroutine) do
if type(value) == "function" and value ~= "running" and value ~= "create" then
_G.coroutine[key] = function(...)
local thread = _coroutine.running()
local info = process.info(thread)
-- note the gc thread does not have a process info
assert(info,"process not found for " .. tostring(thread))
local data = info.data
local co = data.coroutine_handler
local handler = co[key]
return handler(...)
end
else
_G.coroutine[key] = value
end
end
package.loaded.coroutine = _G.coroutine
local kernel_load = _G.load
local intercept_load
@ -72,7 +58,7 @@ process.list[init_thread] = {
{
vars={},
io={}, --init will populate this
coroutine_handler=setmetatable({}, {__index=_coroutine})
coroutine_handler = _coroutine
},
instances = setmetatable({}, {__mode="v"})
}

View File

@ -7,7 +7,7 @@ local lastInterrupt = -math.huge
event.handlers = handlers
function event.register(key, callback, interval, times)
function event.register(key, callback, interval, times, opt_handlers)
local handler =
{
key = key,
@ -17,26 +17,17 @@ function event.register(key, callback, interval, times)
}
handler.timeout = computer.uptime() + handler.interval
opt_handlers = opt_handlers or handlers
local id = 0
repeat
id = id + 1
until not handlers[id]
until not opt_handlers[id]
handlers[id] = handler
opt_handlers[id] = handler
return id
end
local function time_to_nearest()
local timeout = math.huge
for _,handler in pairs(handlers) do
if timeout > handler.timeout then
timeout = handler.timeout
end
end
return timeout
end
local _pullSignal = computer.pullSignal
setmetatable(handlers, {__call=function(_,...)return _pullSignal(...)end})
computer.pullSignal = function(...) -- dispatch
@ -202,7 +193,10 @@ function event.pullFiltered(...)
local deadline = seconds and (computer.uptime() + seconds) or math.huge
repeat
local closest = math.min(deadline, time_to_nearest())
local closest = deadline
for _,handler in pairs(handlers) do
closest = math.min(handler.timeout, closest)
end
local signal = table.pack(computer.pullSignal(closest - computer.uptime()))
if signal.n > 0 then
if not (seconds or filter) or filter == nil or filter(table.unpack(signal, 1, signal.n)) then

View File

@ -77,7 +77,7 @@ function io.error(file)
end
function io.popen(prog, mode, env)
return require("pipes").popen(prog, mode, env)
return require("pipe").popen(prog, mode, env)
end
function io.read(...)

View File

@ -0,0 +1,207 @@
local process = require("process")
local shell = require("shell")
local buffer = require("buffer")
local command_result_as_code = require("sh").internal.command_result_as_code
local pipe = {}
-- root can be a coroutine or a function
function pipe.createCoroutineStack(root, env, name)
checkArg(1, root, "thread", "function")
if type(root) == "function" then
root = assert(process.load(root, env, nil, name or "pipe"), "failed to load proc data for given function")
end
local proc = assert(process.info(root), "failed to load proc data for given coroutine")
local _co = proc.data.coroutine_handler
local pco = setmetatable({root=root}, {__index=_co})
proc.data.coroutine_handler = pco
function pco.yield(...)
return _co.yield(nil, ...)
end
function pco.yield_all(...)
return _co.yield(true, ...)
end
function pco.resume(co, ...)
checkArg(1, co, "thread")
local args = table.pack(...)
while true do -- for consecutive sysyields
local result = table.pack(_co.resume(co, table.unpack(args, 1, args.n)))
if result[1] then -- success: (true, sysval?, ...?)
if _co.status(co) == "dead" or pco.root == co then -- return: (true, ...)
return true, table.unpack(result, 2, result.n)
elseif result[2] ~= nil then -- yield: (true, sysval)
args = table.pack(_co.yield(table.unpack(result, 2, result.n)))
else -- yield: (true, nil, ...)
return true, table.unpack(result, 3, result.n)
end
else -- error: result = (false, string)
return false, result[2]
end
end
end
return pco
end
local pipe_stream =
{
close = function(self)
self.closed = true
if coroutine.status(self.next) == "suspended" then
coroutine.resume(self.next)
end
self.redirect = {}
end,
seek = function()
return nil, "bad file descriptor"
end,
write = function(self, value)
if not self.redirect[1] and self.closed then
-- if next is dead, ignore all writes
if coroutine.status(self.next) ~= "dead" then
io.stderr:write("attempt to use a closed stream\n")
os.exit(1)
end
elseif self.redirect[1] then
return self.redirect[1]:write(value)
elseif not self.closed then
self.buffer = self.buffer .. value
local result = table.pack(coroutine.resume(self.next))
if coroutine.status(self.next) == "dead" then
self:close()
-- always cause os.exit when the pipe closes
-- this is very important
-- e.g. cat very_large_file | head
-- when head is done, cat should stop
result[1] = nil
end
-- the next pipe
if not result[1] then
os.exit(command_result_as_code(result[2]))
end
return self
end
os.exit(0) -- abort the current process: SIGPIPE
end,
read = function(self, n)
if self.closed then
return nil -- eof
end
if self.redirect[0] then
-- popen could be using this code path
-- if that is the case, it is important to leave stream.buffer alone
return self.redirect[0]:read(n)
elseif self.buffer == "" then
coroutine.yield()
end
local result = string.sub(self.buffer, 1, n)
self.buffer = string.sub(self.buffer, n + 1)
return result
end
}
-- prog1 | prog2 | ... | progn
function pipe.buildPipeChain(progs)
local chain = {}
local prev_piped_stream
for i=1,#progs do
local prog = progs[i]
local thread = type(prog) == "thread" and prog or pipe.createCoroutineStack(prog).root
chain[i] = thread
local data = process.info(thread).data
local pio = data.io
local piped_stream
if i < #progs then
local handle = setmetatable({redirect = {rawget(pio, 1)},buffer = ""}, {__index = pipe_stream})
piped_stream = buffer.new("rw", handle)
piped_stream:setvbuf("no", 1024)
-- buffer close flushes the buffer, but we have no buffer
-- also, when the buffer is closed, reads and writes don't pass through
-- simply put, we don't want buffer:close
piped_stream.close = function(self) self.stream:close() end
pio[1] = piped_stream
table.insert(data.handles, piped_stream)
end
if prev_piped_stream then
prev_piped_stream.stream.redirect[0] = rawget(pio, 0)
prev_piped_stream.stream.next = thread
pio[0] = prev_piped_stream
end
prev_piped_stream = piped_stream
end
return chain
end
local chain_stream =
{
read = function(self, value)
-- handler is currently on yield_all [else we wouldn't have control here]
local stack_ok, read_ok, ret = self.pco.resume(self.pco.root, value)
return select(stack_ok and read_ok and 2 or 1, nil, ret)
end,
write = function(self, ...)
return self:read(table.concat({...}))
end,
close = function(self)
self.io_stream:close()
end,
}
function pipe.popen(prog, mode, env)
mode = mode or "r"
if mode ~= "r" and mode ~= "w" then
return nil, "bad argument #2: invalid mode " .. tostring(mode) .. " must be r or w"
end
local r = mode == "r"
local key = r and "read" or "write"
-- to simplify the code - shell.execute is run within a function to pass (prog, env)
-- if cmd_proc where to come second (mode=="w") then the pipe_proc would have to pass
-- the starting args. which is possible, just more complicated
local cmd_proc = process.load(function() return shell.execute(prog, env) end, nil, nil, prog)
-- the chain stream is the popen controller
local stream = setmetatable({}, { __index = chain_stream })
-- the stream needs its own process for io
local pipe_proc = process.load(function()
local n = r and 0 or ""
while true do
n = stream.pco.yield_all(stream.io_stream[key](stream.io_stream, n))
end
end, nil, nil, "pipe_handler")
local pipe_index = r and 2 or 1
local cmd_index = r and 1 or 2
local chain = {[cmd_index]=cmd_proc, [pipe_index]=pipe_proc}
-- upgrade coroutine stack
local cmd_stack = pipe.createCoroutineStack(chain[1])
-- the processes need to share the coroutine handler to yield the cmd_stack
process.info(chain[2]).data.coroutine_handler = cmd_stack
-- link the cmd and pipe proc io
pipe.buildPipeChain(chain)
-- store handle to io_stream from easy access later
stream.io_stream = process.info(cmd_stack.root).data.io[1].stream
stream.pco = cmd_stack
-- popen commands start out running, like threads
cmd_stack.resume(cmd_stack.root)
local buffered_stream = buffer.new(mode, stream)
buffered_stream:setvbuf("no", 1024)
return buffered_stream
end
return pipe

View File

@ -1,372 +0,0 @@
local tx = require("transforms")
local shell = require("shell")
local sh = require("sh")
local process = require("process")
local pipes = {}
local pipeStream = {}
local function bfd() return nil, "bad file descriptor" end
function pipeStream.new(pm)
local stream = {pm=pm}
local metatable = {__index = pipeStream}
return setmetatable(stream, metatable)
end
function pipeStream:resume()
local yield_args = table.pack(self.pm.pco.resume_all())
if not yield_args[1] then
self.pm.dead = true
if not yield_args[1] and yield_args[2] then
io.stderr:write(tostring(yield_args[2]) .. "\n")
end
end
return table.unpack(yield_args)
end
function pipeStream:close()
if self.pm.closed then -- already closed
return
end
self.pm.closed = true
-- if our pco stack is empty, we've already run fully
if self.pm.pco.top() == nil then
return
end
-- if a thread aborted we have set dead true
if self.pm.dead then
return
end
-- run prog until dead
local co = self.pm.pco.previous_handler
local pco_root = self.pm.threads[1]
if co.status(pco_root) == "dead" then
-- I would have liked the pco stack to unwind itself for dead coroutines
-- maybe I haven't handled aborts correctly
return
end
return self:resume(true)
end
function pipeStream:read(n)
local pm = self.pm
if pm.closed then
return bfd()
end
if pm:buffer() == '' and not pm.dead then
local result = table.pack(self:resume())
if not result[1] then
-- resume can fail if p1 crashes
self:close()
return nil, "pipe closed unexpectedly"
elseif result.n > 1 and not result[2] then
return result[2], result[3]
end
end
local result = pm:buffer(n)
if result == '' and pm.dead and n > 0 then
return nil -- eof
end
return result
end
pipeStream.seek = bfd
function pipeStream:write(v)
local pm = self.pm
if pm.closed or pm.dead then
-- if prog is dead, ignore all writes
if pm.pco.previous_handler.status(pm.threads[pm.self_id]) ~= "dead" then
error("attempt to use a closed stream")
end
return bfd()
end
pm:buffer(pm:buffer() .. v)
-- allow handler to push write event
local result = table.pack(self:resume())
if not result[1] then
-- resume can fail if p1 crashes
pm.dead = true
self:close()
return nil, "pipe closed unexpectedly"
end
return self
end
function pipes.createCoroutineStack(fp, init, name)
local _co = process.info().data.coroutine_handler
local pco = setmetatable(
{
stack = {},
next = false,
create = _co.create,
wrap = _co.wrap,
previous_handler = _co
}, {__index=_co})
function pco.top()
return pco.stack[#pco.stack]
end
function pco.yield(...)
-- pop last
pco.set_unwind(pco.running())
return _co.yield(...)
end
function pco.index_of(thread)
for i,t in ipairs(pco.stack) do
if t == thread then
return i
end
end
end
function pco.yield_all(...)
local current = pco.running()
local existing_index = pco.index_of(current)
assert(existing_index, "cannot yield inactive stack")
pco.next = current
return _co.yield(...)
end
function pco.set_unwind(from)
pco.next = false
if from then
local index = pco.index_of(from)
if index then
pco.stack = tx.sub(pco.stack, 1, index-1)
pco.next = pco.stack[index-1] or false
end
end
end
function pco.resume_all(...)
local base = pco.stack[1]
local top = pco.top()
if type(base) ~= "thread" or _co.status(base) ~= "suspended" or
type(top) ~= "thread" or _co.status(top) ~= "suspended" then
return false
end
local status, result = pcall(function(...)
local _result = table.pack(pco.resume(top, ...))
return _result
end,...)
if not status then
return nil, result
end
return table.unpack(result)
end
function pco.resume(thread, ...)
checkArg(1, thread, "thread")
local status = pco.status(thread)
if status ~= "suspended" then
local msg = string.format("cannot resume %s coroutine",
status == "dead" and "dead" or "non-suspended")
return false, msg
end
local current_index = pco.index_of(pco.running())
local existing_index = pco.index_of(thread)
if not existing_index then
assert(current_index, "pco coroutines cannot resume threads outside the stack")
pco.stack = tx.concat(tx.sub(pco.stack, 1, current_index), {thread})
end
if current_index then
-- current should be waiting for yield
pco.next = thread
local t = table.pack(_co.yield(...)) -- pass args to resume next
return pco.last == nil and true or pco.last, table.unpack(t,1,t.n)
else
-- the stack is not running
pco.next = false
local yield_args = table.pack(_co.resume(thread, ...))
if #pco.stack > 0 then
-- thread may have crashed (crash unwinds as well)
-- or we don't have next lined up (unwind)
if not pco.next or not yield_args[1] then
-- unwind from current index, not top
pco.set_unwind(thread)
end
-- if next is current thread, yield_all is active
-- in such a case, yield out first, then resume where we left off
if pco.next and pco.next ~= thread then
local next = pco.next
pco.next = false
pco.last = yield_args[1]
return pco.resume(next, table.unpack(yield_args,2,yield_args.n))
end
end
return table.unpack(yield_args)
end
end
function pco.status(thread)
checkArg(1, thread, "thread")
local current_index = pco.index_of(pco.running())
local existing_index = pco.index_of(thread)
if current_index and existing_index and existing_index < current_index then
local current = pco.stack[current_index]
if current and _co.status(current) == "running" then
return "normal"
end
end
return _co.status(thread)
end
if fp then
pco.stack = {process.load(fp,nil,init,name or "pco root")}
process.info(pco.stack[1]).data.coroutine_handler = pco
end
return pco
end
local pipeManager = {}
function pipeManager.reader(pm,...)
while pm.pco.status(pm.threads[pm.prog_id]) ~= "dead" do
pm.pco.yield_all()
-- kick back to main thread, true to kick back one further
if pm.closed then break end
-- if we are a reader pipe, we leave the buffer alone and yield to previous
if pm.pco.status(pm.threads[pm.prog_id]) ~= "dead" then
pm.pco.yield(...)
end
end
pm.dead = true
end
function pipeManager:buffer(value)
-- if value but no stream, buffer for buffer
local s = self and self.pipe and self.pipe.stream
if not s then
if type(value) == "string" or self.prewrite then
self.prewrite = self.prewrite or {}
s = self.prewrite -- s.buffer will be self.prewrite.buffer
else
return ''
end
elseif self.prewrite then -- we stored, previously, a prewrite buffer
s.buffer = self.prewrite.buffer .. s.buffer
self.prewrite = nil
end
if type(value) == "string" then
s.buffer = value
return value
elseif type(value) ~= "number" then
return s.buffer -- don't truncate
end
local result = string.sub(s.buffer, 1, value)
s.buffer = string.sub(s.buffer, value + 1)
return result
end
function pipeManager:redirectRead()
local reader = {pm=self}
function reader:read(n)
local pm = self.pm
local pco = pm.pco
-- if we have any buffer, return it first
if pm:buffer() == '' and not pm.closed and not pm.dead then
pco.yield_all()
end
if pm.closed or pm.dead then
return nil
end
return pm:buffer(n)
end
return reader
end
function pipes.createPipe(prog, mode, env)
mode = mode or "r"
if mode ~= "r" and mode ~= "w" then
return nil, "bad argument #2: invalid mode " .. tostring(mode) .. " must be r or w"
end
local shellPath, reason = shell.resolve(os.getenv("SHELL") or "/bin/sh", "lua")
if not shellPath then
return nil, reason
end
local pm = setmetatable(
{dead=false,closed=false,prog=prog,mode=mode,env=env},
{__index=pipeManager}
)
pm.prog_id = pm.mode == "r" and 1 or 2
pm.self_id = pm.mode == "r" and 2 or 1
pm.handler = pm.mode == "r" and
function()return pipeManager.reader(pm)end or
function()pm.dead=true end
pm.commands = {}
pm.commands[pm.prog_id] = {shellPath, {}}
pm.commands[pm.self_id] = {pm.handler, {}}
pm.root = function()
local reason
pm.threads, reason = sh.internal.createThreads(pm.commands, pm.env, {[pm.prog_id]=table.pack(pm.env,pm.prog)})
if not pm.threads then
pm.dead = true
return false, reason
end
pm.pipe = process.info(pm.threads[1]).data.io[1]
-- if we are the writer, we need args to resume prog
if pm.mode == "w" then
pm.pipe.stream.redirect[0] = pm:redirectRead()
end
return sh.internal.runThreads(pm.threads)
end
pm.pco = pipes.createCoroutineStack(pm.root)
return pipeStream.new(pm)
end
function pipes.popen(prog, mode, env)
checkArg(1, prog, "string")
checkArg(2, mode, "string", "nil")
checkArg(3, env, "table", "nil")
local pipe, reason = pipes.createPipe(prog, mode, env)
if not pipe then
return false, reason
end
-- pipe file descriptor
local pfd = require("buffer").new(mode, pipe)
pfd:setvbuf("no", 0) -- 2nd are to read chunk size
-- popen processes start on create (like a thread)
pfd.stream:resume()
return pfd
end
return pipes

View File

@ -104,14 +104,8 @@ function process.load(path, env, init, name)
return thread
end
function process.running(level) -- kept for backwards compat, prefer process.info
local info = process.info(level)
if info then
return info.path, info.env, info.command
end
end
function process.info(levelOrThread)
checkArg(1, levelOrThread, "thread", "number", "nil")
local p
if type(levelOrThread) == "thread" then
p = process.findProcess(levelOrThread)
@ -141,4 +135,19 @@ function process.internal.close(thread, result)
process.list[thread] = nil
end
function process.internal.continue(co, ...)
local result = {}
-- Emulate CC behavior by making yields a filtered event.pull()
local args = table.pack(...)
while coroutine.status(co) ~= "dead" do
result = table.pack(coroutine.resume(co, table.unpack(args, 1, args.n)))
if coroutine.status(co) ~= "dead" then
args = table.pack(coroutine.yield(table.unpack(result, 2, result.n)))
elseif not result[1] then
io.stderr:write(result[2])
end
end
return table.unpack(result, 2, result.n)
end
return process

View File

@ -180,29 +180,12 @@ function sh.internal.createThreads(commands, env, start_args)
end
if #threads > 1 then
sh.internal.buildPipeChain(threads)
require("pipe").buildPipeChain(threads)
end
return threads
end
function sh.internal.runThreads(threads)
local result = {}
for i = 1, #threads do
-- Emulate CC behavior by making yields a filtered event.pull()
local thread, args = threads[i], {}
while coroutine.status(thread) ~= "dead" do
result = table.pack(coroutine.resume(thread, table.unpack(args)))
if coroutine.status(thread) ~= "dead" then
args = table.pack(coroutine.yield(table.unpack(result, 2, result.n)))
elseif not result[1] then
io.stderr:write(result[2])
end
end
end
return result[2]
end
function sh.internal.executePipes(pipe_parts, eargs, env)
local commands = {}
for _,words in ipairs(pipe_parts) do
@ -260,7 +243,7 @@ function sh.internal.executePipes(pipe_parts, eargs, env)
local threads, reason = sh.internal.createThreads(commands, env, {[#commands]=eargs})
if not threads then return false, reason end
return sh.internal.runThreads(threads)
return process.internal.continue(threads[1])
end
function sh.execute(env, command, ...)

View File

@ -124,16 +124,9 @@ function shell.execute(command, env, ...)
if not sh then
return false, reason
end
local result = table.pack(coroutine.resume(process.load(function(...)
return sh(...)
end), env, command, ...))
if not result[1] and type(result[2]) == "table" and result[2].reason == "terminated" then
if result[2].code then
return true
else
return false, "terminated"
end
end
local proc = process.load(sh, nil, nil, command)
local result = table.pack(process.internal.continue(proc, env, command, ...))
if result.n == 0 then return true end
return table.unpack(result, 1, result.n)
end

View File

@ -1,49 +1,10 @@
local pipes = require("pipes")
local pipe = require("pipe")
local event = require("event")
local process = require("process")
local computer = require("computer")
local thread = {}
do
local handlers = event.handlers
local handlers_mt = getmetatable(handlers)
-- the event library sets a metatable on handlers, but we set threaded=true
if not handlers_mt.threaded then
-- find the root process
local root_data
for _,p in pairs(process.list) do
if not p.parent then
root_data = p.data
break
end
end
assert(root_data, "thread library panic: no root proc")
handlers_mt.threaded = true
-- if we don't separate root handlers from thread handlers we see double dispatch
-- because the thread calls dispatch on pull as well
root_data.handlers = {} -- root handlers
root_data.pull = handlers_mt.__call -- the real computer.pullSignal
while true do
local key, value = next(handlers)
if not key then break end
root_data.handlers[key] = value
handlers[key] = nil
end
handlers_mt.__index = function(_, key)
return process.info().data.handlers[key]
end
handlers_mt.__newindex = function(_, key, value)
process.info().data.handlers[key] = value
end
handlers_mt.__pairs = function(_, ...)
return pairs(process.info().data.handlers, ...)
end
handlers_mt.__call = function(tbl, ...)
return process.info().data.pull(tbl, ...)
end
end
end
local init_thread
local function waitForDeath(threads, timeout, all)
checkArg(1, threads, "table")
@ -57,7 +18,8 @@ local function waitForDeath(threads, timeout, all)
local dieing = {}
local living = false
for _,t in ipairs(threads) do
local result = t.process and t.process.data.result
local mt = getmetatable(t)
local result = mt.attached.data.result
local proc_ok = type(result) ~= "table" or result[1]
local ready_to_die = t:status() ~= "running" -- suspended is considered dead to exit
or not proc_ok -- the thread is killed if its attached process has a non zero exit
@ -133,57 +95,62 @@ function box_thread:status()
end
function box_thread:join(timeout)
return box_thread_handle.close({self}, timeout)
return waitForDeath({self}, timeout, true)
end
function box_thread:kill()
self:detach()
if self:status() == "dead" then
return
end
getmetatable(self).__status = "dead"
self.pco.stack = {}
getmetatable(self).close()
end
function box_thread:detach()
if not self.process then
return
end
local handles = self.process.data.handles
local btHandle = get_box_thread_handle(handles)
if not btHandle then
return nil, "thread failed to detach, process had no thread handle"
end
for i,h in ipairs(btHandle) do
if h == self then
self.process = nil
return table.remove(btHandle, i)
end
end
return nil, "thread not found in parent process"
return self:attach(init_thread)
end
function box_thread:attach(parent)
checkArg(1, parent, "thread", "number", "nil")
local mt = assert(getmetatable(self), "thread panic: no metadata")
local proc = process.info(parent)
if not proc then return nil, "thread failed to attach, process not found" end
self:detach()
if mt.attached == proc then return self end -- already attached
local waiting_handler
if mt.attached then
local prev_btHandle = assert(get_box_thread_handle(mt.attached.data.handles), "thread panic: no thread handle")
for i,h in ipairs(prev_btHandle) do
if h == self then
table.remove(prev_btHandle, i)
if mt.id then
waiting_handler = assert(mt.attached.data.handlers[mt.id], "thread panic: no event handler")
mt.attached.data.handlers[mt.id] = nil
end
break
end
end
end
-- attach to parent or the current process
self.process = proc
local handles = self.process.data.handles
mt.attached = proc
local handles = proc.data.handles
-- this process may not have a box_thread manager handle
local btHandle = get_box_thread_handle(handles, true)
table.insert(btHandle, self)
return true
if waiting_handler then -- event-waiting
mt.register(waiting_handler.timeout - computer.uptime())
end
return self
end
function thread.create(fp, ...)
checkArg(1, fp, "function")
local t = setmetatable({}, {__status="suspended",__index=box_thread})
t.pco = pipes.createCoroutineStack(function(...)
getmetatable(t).__status = "running"
local t = {}
local mt = {__status="suspended",__index=box_thread}
setmetatable(t, mt)
t.pco = pipe.createCoroutineStack(function(...)
mt.__status = "running"
local fp_co = t.pco.create(fp)
-- run fp_co until dead
-- pullSignal will yield_all past this point
@ -217,34 +184,35 @@ function thread.create(fp, ...)
end
args = table.pack(event.pull(table.unpack(result, 2, result.n)))
end
end)
end, nil, "thread")
--special resume to keep track of process death
local function private_resume(...)
local result = table.pack(t.pco.resume_all(...))
if #t.pco.stack == 0 then
t:detach()
local mt = getmetatable(t)
mt.__status = "dead"
event.push("thread_exit")
function mt.private_resume(...)
mt.id = nil
-- this thread may have been killed
if t:status() == "dead" then return end
local result = table.pack(t.pco.resume(t.pco.root, ...))
if t.pco.status(t.pco.root) == "dead" then
mt.close()
end
return table.unpack(result, 1, result.n)
end
local data = process.info(t.pco.stack[1]).data
data.handlers = {}
data.pull = function(_, timeout)
-- register a timeout handler
-- hack so that event.register sees the root handlers
local data_handlers = data.handlers
data.handlers = process.info(2).data.handlers
event.register(
nil, -- nil key matches anything, timers use false keys
private_resume,
timeout, -- wait for the time specified by the caller
1) -- we only want this thread to wake up once
data.handlers = data_handlers
mt.process = process.list[t.pco.root]
mt.process.data.handlers = {}
function mt.register(timeout)
-- register a timeout handler
mt.id = event.register(
nil, -- nil key matches anything, timers use false keys
mt.private_resume,
timeout, -- wait for the time specified by the caller
1, -- we only want this thread to wake up once
mt.attached.data.handlers) -- optional arg, to specify our own handlers
end
function mt.process.data.pull(_, timeout)
mt.register(timeout)
-- yield_all will yield this pco stack
-- the callback will resume this stack
local event_data
@ -255,10 +223,68 @@ function thread.create(fp, ...)
return table.unpack(event_data, 1, event_data.n)
end
t:attach()
private_resume(...) -- threads start out running
function mt.close()
if t:status() == "dead" then
return
end
local htm = get_box_thread_handle(mt.attached.data.handles)
for _,ht in ipairs(htm) do
if ht == t then
table.remove(htm, _)
break
end
end
mt.__status = "dead"
event.push("thread_exit")
end
t:attach() -- the current process
mt.private_resume(...) -- threads start out running
return t
end
do
local handlers = event.handlers
local handlers_mt = getmetatable(handlers)
-- the event library sets a metatable on handlers, but we set threaded=true
if not handlers_mt.threaded then
-- find the root process
local root_data
for t,p in pairs(process.list) do
if not p.parent then
init_thread = t
root_data = p.data
break
end
end
assert(init_thread, "thread library panic: no init thread")
handlers_mt.threaded = true
-- handles might be optimized out for memory
root_data.handles = root_data.handles or {}
-- if we don't separate root handlers from thread handlers we see double dispatch
-- because the thread calls dispatch on pull as well
root_data.handlers = {} -- root handlers
root_data.pull = handlers_mt.__call -- the real computer.pullSignal
while true do
local key, value = next(handlers)
if not key then break end
root_data.handlers[key] = value
handlers[key] = nil
end
handlers_mt.__index = function(_, key)
return process.info().data.handlers[key]
end
handlers_mt.__newindex = function(_, key, value)
process.info().data.handlers[key] = value
end
handlers_mt.__pairs = function(_, ...)
return pairs(process.info().data.handlers, ...)
end
handlers_mt.__call = function(tbl, ...)
return process.info().data.pull(tbl, ...)
end
end
end
return thread

View File

@ -1,7 +1,7 @@
-- called from /init.lua
local raw_loadfile = ...
_G._OSVERSION = "OpenOS 1.6.4"
_G._OSVERSION = "OpenOS 1.6.5"
local component = component
local computer = computer
@ -108,7 +108,6 @@ status("Initializing file system...")
-- Mount the ROM and temporary file systems to allow working on the file
-- system module from this point on.
require("filesystem").mount(computer.getBootAddress(), "/")
package.preload={}
status("Running boot scripts...")

View File

@ -99,36 +99,6 @@ function sh.internal.openCommandRedirects(redirects)
end
end
function sh.internal.buildPipeChain(threads)
local prev_pipe
for i=1,#threads do
local thread = threads[i]
local data = process.info(thread).data
local pio = data.io
local pipe
if i < #threads then
pipe = require("buffer").new("rw", sh.internal.newMemoryStream())
pipe:setvbuf("no", 0)
-- buffer close flushes the buffer, but we have no buffer
-- also, when the buffer is closed, read and writes don't pass through
-- simply put, we don't want buffer:close
pipe.close = function(self) self.stream:close() end
pipe.stream.redirect[1] = rawget(pio, 1)
pio[1] = pipe
table.insert(data.handles, pipe)
end
if prev_pipe then
prev_pipe.stream.redirect[0] = rawget(pio, 0)
prev_pipe.stream.next = thread
pio[0] = prev_pipe
end
prev_pipe = pipe
end
end
-- takes an eword, returns a list of glob hits or {word} if no globs exist
function sh.internal.glob(eword)
-- words are parts, parts are txt and qr
@ -524,67 +494,6 @@ function sh.internal.remove_negation(chain)
return false
end
function sh.internal.newMemoryStream()
local memoryStream = {}
function memoryStream:close()
self.closed = true
self.redirect = {}
end
function memoryStream:seek()
return nil, "bad file descriptor"
end
function memoryStream:read(n)
if self.closed then
return nil -- eof
end
if self.redirect[0] then
-- popen could be using this code path
-- if that is the case, it is important to leave stream.buffer alone
return self.redirect[0]:read(n)
elseif self.buffer == "" then
coroutine.yield()
end
local result = string.sub(self.buffer, 1, n)
self.buffer = string.sub(self.buffer, n + 1)
return result
end
function memoryStream:write(value)
if not self.redirect[1] and self.closed then
-- if next is dead, ignore all writes
if coroutine.status(self.next) ~= "dead" then
io.stderr:write("attempt to use a closed stream\n")
os.exit(1)
end
elseif self.redirect[1] then
return self.redirect[1]:write(value)
elseif not self.closed then
self.buffer = self.buffer .. value
local result = table.pack(coroutine.resume(self.next))
if coroutine.status(self.next) == "dead" then
self:close()
-- always cause os.exit when the pipe closes
result[1] = nil
end
-- the next pipe
if not result[1] then
os.exit(sh.internal.command_result_as_code(result[2]))
end
return self
end
os.exit(0) -- abort the current process: SIGPIPE
end
local stream = {closed = false, buffer = "",
redirect = {}, result = {}}
local metatable = {__index = memoryStream,
__metatable = "memorystream"}
return setmetatable(stream, metatable)
end
function sh.internal.execute_complex(statements, eargs, env)
for si=1,#statements do local s = statements[si]
local chains = sh.internal.groupChains(s)