From 5fdee17fcd0c40022b6a00c70efdbcdcd0753a42 Mon Sep 17 00:00:00 2001 From: payonel Date: Sun, 18 Jun 2017 00:02:08 -0700 Subject: [PATCH] openos 1.6.5 update fixes and optimizations, one api removed process: process.running has been deprecated for over 2 years and is now obsolete, please use process.info process: added process.internal.run designed to run a coroutine to completion 01_process: cleaner coroutine_handler intercept code event: removed unnecessary local function and added an optional handlers param to register to simplify thread code io: update popen to use new /lib/pipe.lua library pipe: new /lib/pipe replace /lib/pipes > superior coroutine stack code, can reparent stacks > handles all pipe work, sh calls /lib/pipe now for pipes sh: now uses /lib/pipe to build pipe chains, calls shell.run to run a pipe chain. thread: greatly improved reliability of embedded and detached threads boot: removed dead code --- .../loot/openos/boot/01_process.lua | 24 +- .../opencomputers/loot/openos/lib/event.lua | 22 +- .../opencomputers/loot/openos/lib/io.lua | 2 +- .../opencomputers/loot/openos/lib/pipe.lua | 207 ++++++++++ .../opencomputers/loot/openos/lib/pipes.lua | 372 ------------------ .../opencomputers/loot/openos/lib/process.lua | 23 +- .../opencomputers/loot/openos/lib/sh.lua | 21 +- .../opencomputers/loot/openos/lib/shell.lua | 13 +- .../opencomputers/loot/openos/lib/thread.lua | 214 +++++----- .../loot/openos/opt/core/boot.lua | 3 +- .../loot/openos/opt/core/full_sh.lua | 91 ----- 11 files changed, 363 insertions(+), 629 deletions(-) create mode 100644 src/main/resources/assets/opencomputers/loot/openos/lib/pipe.lua delete mode 100644 src/main/resources/assets/opencomputers/loot/openos/lib/pipes.lua diff --git a/src/main/resources/assets/opencomputers/loot/openos/boot/01_process.lua b/src/main/resources/assets/opencomputers/loot/openos/boot/01_process.lua index b514508bd..4c92f6574 100644 --- a/src/main/resources/assets/opencomputers/loot/openos/boot/01_process.lua +++ b/src/main/resources/assets/opencomputers/loot/openos/boot/01_process.lua @@ -3,25 +3,11 @@ local process = require("process") --Initialize coroutine library-- local _coroutine = coroutine -- real coroutine backend -_G.coroutine = {} -package.loaded.coroutine = _G.coroutine +_G.coroutine = setmetatable({}, {__index = function(_, key) + return assert(process.info(_coroutine.running()), "thread has no proc").data.coroutine_handler[key] +end}) -for key,value in pairs(_coroutine) do - if type(value) == "function" and value ~= "running" and value ~= "create" then - _G.coroutine[key] = function(...) - local thread = _coroutine.running() - local info = process.info(thread) - -- note the gc thread does not have a process info - assert(info,"process not found for " .. tostring(thread)) - local data = info.data - local co = data.coroutine_handler - local handler = co[key] - return handler(...) - end - else - _G.coroutine[key] = value - end -end +package.loaded.coroutine = _G.coroutine local kernel_load = _G.load local intercept_load @@ -72,7 +58,7 @@ process.list[init_thread] = { { vars={}, io={}, --init will populate this - coroutine_handler=setmetatable({}, {__index=_coroutine}) + coroutine_handler = _coroutine }, instances = setmetatable({}, {__mode="v"}) } diff --git a/src/main/resources/assets/opencomputers/loot/openos/lib/event.lua b/src/main/resources/assets/opencomputers/loot/openos/lib/event.lua index e46040654..5e59e6e72 100644 --- a/src/main/resources/assets/opencomputers/loot/openos/lib/event.lua +++ b/src/main/resources/assets/opencomputers/loot/openos/lib/event.lua @@ -7,7 +7,7 @@ local lastInterrupt = -math.huge event.handlers = handlers -function event.register(key, callback, interval, times) +function event.register(key, callback, interval, times, opt_handlers) local handler = { key = key, @@ -17,26 +17,17 @@ function event.register(key, callback, interval, times) } handler.timeout = computer.uptime() + handler.interval + opt_handlers = opt_handlers or handlers local id = 0 repeat id = id + 1 - until not handlers[id] + until not opt_handlers[id] - handlers[id] = handler + opt_handlers[id] = handler return id end -local function time_to_nearest() - local timeout = math.huge - for _,handler in pairs(handlers) do - if timeout > handler.timeout then - timeout = handler.timeout - end - end - return timeout -end - local _pullSignal = computer.pullSignal setmetatable(handlers, {__call=function(_,...)return _pullSignal(...)end}) computer.pullSignal = function(...) -- dispatch @@ -202,7 +193,10 @@ function event.pullFiltered(...) local deadline = seconds and (computer.uptime() + seconds) or math.huge repeat - local closest = math.min(deadline, time_to_nearest()) + local closest = deadline + for _,handler in pairs(handlers) do + closest = math.min(handler.timeout, closest) + end local signal = table.pack(computer.pullSignal(closest - computer.uptime())) if signal.n > 0 then if not (seconds or filter) or filter == nil or filter(table.unpack(signal, 1, signal.n)) then diff --git a/src/main/resources/assets/opencomputers/loot/openos/lib/io.lua b/src/main/resources/assets/opencomputers/loot/openos/lib/io.lua index b0d33946c..400761a3a 100644 --- a/src/main/resources/assets/opencomputers/loot/openos/lib/io.lua +++ b/src/main/resources/assets/opencomputers/loot/openos/lib/io.lua @@ -77,7 +77,7 @@ function io.error(file) end function io.popen(prog, mode, env) - return require("pipes").popen(prog, mode, env) + return require("pipe").popen(prog, mode, env) end function io.read(...) diff --git a/src/main/resources/assets/opencomputers/loot/openos/lib/pipe.lua b/src/main/resources/assets/opencomputers/loot/openos/lib/pipe.lua new file mode 100644 index 000000000..9b8cb5f06 --- /dev/null +++ b/src/main/resources/assets/opencomputers/loot/openos/lib/pipe.lua @@ -0,0 +1,207 @@ +local process = require("process") +local shell = require("shell") +local buffer = require("buffer") +local command_result_as_code = require("sh").internal.command_result_as_code + +local pipe = {} + +-- root can be a coroutine or a function +function pipe.createCoroutineStack(root, env, name) + checkArg(1, root, "thread", "function") + + if type(root) == "function" then + root = assert(process.load(root, env, nil, name or "pipe"), "failed to load proc data for given function") + end + + local proc = assert(process.info(root), "failed to load proc data for given coroutine") + local _co = proc.data.coroutine_handler + + local pco = setmetatable({root=root}, {__index=_co}) + proc.data.coroutine_handler = pco + + function pco.yield(...) + return _co.yield(nil, ...) + end + function pco.yield_all(...) + return _co.yield(true, ...) + end + function pco.resume(co, ...) + checkArg(1, co, "thread") + local args = table.pack(...) + while true do -- for consecutive sysyields + local result = table.pack(_co.resume(co, table.unpack(args, 1, args.n))) + if result[1] then -- success: (true, sysval?, ...?) + if _co.status(co) == "dead" or pco.root == co then -- return: (true, ...) + return true, table.unpack(result, 2, result.n) + elseif result[2] ~= nil then -- yield: (true, sysval) + args = table.pack(_co.yield(table.unpack(result, 2, result.n))) + else -- yield: (true, nil, ...) + return true, table.unpack(result, 3, result.n) + end + else -- error: result = (false, string) + return false, result[2] + end + end + end + return pco +end + +local pipe_stream = +{ + close = function(self) + self.closed = true + if coroutine.status(self.next) == "suspended" then + coroutine.resume(self.next) + end + self.redirect = {} + end, + seek = function() + return nil, "bad file descriptor" + end, + write = function(self, value) + if not self.redirect[1] and self.closed then + -- if next is dead, ignore all writes + if coroutine.status(self.next) ~= "dead" then + io.stderr:write("attempt to use a closed stream\n") + os.exit(1) + end + elseif self.redirect[1] then + return self.redirect[1]:write(value) + elseif not self.closed then + self.buffer = self.buffer .. value + local result = table.pack(coroutine.resume(self.next)) + if coroutine.status(self.next) == "dead" then + self:close() + -- always cause os.exit when the pipe closes + -- this is very important + -- e.g. cat very_large_file | head + -- when head is done, cat should stop + result[1] = nil + end + -- the next pipe + if not result[1] then + os.exit(command_result_as_code(result[2])) + end + return self + end + os.exit(0) -- abort the current process: SIGPIPE + end, + read = function(self, n) + if self.closed then + return nil -- eof + end + if self.redirect[0] then + -- popen could be using this code path + -- if that is the case, it is important to leave stream.buffer alone + return self.redirect[0]:read(n) + elseif self.buffer == "" then + coroutine.yield() + end + local result = string.sub(self.buffer, 1, n) + self.buffer = string.sub(self.buffer, n + 1) + return result + end +} + +-- prog1 | prog2 | ... | progn +function pipe.buildPipeChain(progs) + local chain = {} + local prev_piped_stream + for i=1,#progs do + local prog = progs[i] + local thread = type(prog) == "thread" and prog or pipe.createCoroutineStack(prog).root + chain[i] = thread + local data = process.info(thread).data + local pio = data.io + + local piped_stream + if i < #progs then + local handle = setmetatable({redirect = {rawget(pio, 1)},buffer = ""}, {__index = pipe_stream}) + piped_stream = buffer.new("rw", handle) + piped_stream:setvbuf("no", 1024) + -- buffer close flushes the buffer, but we have no buffer + -- also, when the buffer is closed, reads and writes don't pass through + -- simply put, we don't want buffer:close + piped_stream.close = function(self) self.stream:close() end + pio[1] = piped_stream + table.insert(data.handles, piped_stream) + end + + if prev_piped_stream then + prev_piped_stream.stream.redirect[0] = rawget(pio, 0) + prev_piped_stream.stream.next = thread + pio[0] = prev_piped_stream + end + + prev_piped_stream = piped_stream + end + + return chain +end + +local chain_stream = +{ + read = function(self, value) + -- handler is currently on yield_all [else we wouldn't have control here] + local stack_ok, read_ok, ret = self.pco.resume(self.pco.root, value) + return select(stack_ok and read_ok and 2 or 1, nil, ret) + end, + write = function(self, ...) + return self:read(table.concat({...})) + end, + close = function(self) + self.io_stream:close() + end, +} + +function pipe.popen(prog, mode, env) + mode = mode or "r" + if mode ~= "r" and mode ~= "w" then + return nil, "bad argument #2: invalid mode " .. tostring(mode) .. " must be r or w" + end + + local r = mode == "r" + local key = r and "read" or "write" + + -- to simplify the code - shell.execute is run within a function to pass (prog, env) + -- if cmd_proc where to come second (mode=="w") then the pipe_proc would have to pass + -- the starting args. which is possible, just more complicated + local cmd_proc = process.load(function() return shell.execute(prog, env) end, nil, nil, prog) + + -- the chain stream is the popen controller + local stream = setmetatable({}, { __index = chain_stream }) + + -- the stream needs its own process for io + local pipe_proc = process.load(function() + local n = r and 0 or "" + while true do + n = stream.pco.yield_all(stream.io_stream[key](stream.io_stream, n)) + end + end, nil, nil, "pipe_handler") + + local pipe_index = r and 2 or 1 + local cmd_index = r and 1 or 2 + local chain = {[cmd_index]=cmd_proc, [pipe_index]=pipe_proc} + + -- upgrade coroutine stack + local cmd_stack = pipe.createCoroutineStack(chain[1]) + + -- the processes need to share the coroutine handler to yield the cmd_stack + process.info(chain[2]).data.coroutine_handler = cmd_stack + + -- link the cmd and pipe proc io + pipe.buildPipeChain(chain) + + -- store handle to io_stream from easy access later + stream.io_stream = process.info(cmd_stack.root).data.io[1].stream + stream.pco = cmd_stack + + -- popen commands start out running, like threads + cmd_stack.resume(cmd_stack.root) + + local buffered_stream = buffer.new(mode, stream) + buffered_stream:setvbuf("no", 1024) + return buffered_stream +end + +return pipe diff --git a/src/main/resources/assets/opencomputers/loot/openos/lib/pipes.lua b/src/main/resources/assets/opencomputers/loot/openos/lib/pipes.lua deleted file mode 100644 index 4606b94a9..000000000 --- a/src/main/resources/assets/opencomputers/loot/openos/lib/pipes.lua +++ /dev/null @@ -1,372 +0,0 @@ -local tx = require("transforms") -local shell = require("shell") -local sh = require("sh") -local process = require("process") - -local pipes = {} - -local pipeStream = {} -local function bfd() return nil, "bad file descriptor" end -function pipeStream.new(pm) - local stream = {pm=pm} - local metatable = {__index = pipeStream} - return setmetatable(stream, metatable) -end -function pipeStream:resume() - local yield_args = table.pack(self.pm.pco.resume_all()) - if not yield_args[1] then - self.pm.dead = true - - if not yield_args[1] and yield_args[2] then - io.stderr:write(tostring(yield_args[2]) .. "\n") - end - end - return table.unpack(yield_args) -end -function pipeStream:close() - if self.pm.closed then -- already closed - return - end - - self.pm.closed = true - - -- if our pco stack is empty, we've already run fully - if self.pm.pco.top() == nil then - return - end - - -- if a thread aborted we have set dead true - if self.pm.dead then - return - end - - -- run prog until dead - local co = self.pm.pco.previous_handler - local pco_root = self.pm.threads[1] - if co.status(pco_root) == "dead" then - -- I would have liked the pco stack to unwind itself for dead coroutines - -- maybe I haven't handled aborts correctly - return - end - - return self:resume(true) -end -function pipeStream:read(n) - local pm = self.pm - - if pm.closed then - return bfd() - end - - if pm:buffer() == '' and not pm.dead then - local result = table.pack(self:resume()) - if not result[1] then - -- resume can fail if p1 crashes - self:close() - return nil, "pipe closed unexpectedly" - elseif result.n > 1 and not result[2] then - return result[2], result[3] - end - end - - local result = pm:buffer(n) - if result == '' and pm.dead and n > 0 then - return nil -- eof - end - - return result -end -pipeStream.seek = bfd -function pipeStream:write(v) - local pm = self.pm - if pm.closed or pm.dead then - -- if prog is dead, ignore all writes - if pm.pco.previous_handler.status(pm.threads[pm.self_id]) ~= "dead" then - error("attempt to use a closed stream") - end - return bfd() - end - - pm:buffer(pm:buffer() .. v) - - -- allow handler to push write event - local result = table.pack(self:resume()) - if not result[1] then - -- resume can fail if p1 crashes - pm.dead = true - self:close() - return nil, "pipe closed unexpectedly" - end - - return self -end - -function pipes.createCoroutineStack(fp, init, name) - local _co = process.info().data.coroutine_handler - - local pco = setmetatable( - { - stack = {}, - next = false, - create = _co.create, - wrap = _co.wrap, - previous_handler = _co - }, {__index=_co}) - - function pco.top() - return pco.stack[#pco.stack] - end - function pco.yield(...) - -- pop last - pco.set_unwind(pco.running()) - return _co.yield(...) - end - function pco.index_of(thread) - for i,t in ipairs(pco.stack) do - if t == thread then - return i - end - end - end - function pco.yield_all(...) - local current = pco.running() - local existing_index = pco.index_of(current) - assert(existing_index, "cannot yield inactive stack") - pco.next = current - return _co.yield(...) - end - function pco.set_unwind(from) - pco.next = false - if from then - local index = pco.index_of(from) - if index then - pco.stack = tx.sub(pco.stack, 1, index-1) - pco.next = pco.stack[index-1] or false - end - end - end - function pco.resume_all(...) - local base = pco.stack[1] - local top = pco.top() - if type(base) ~= "thread" or _co.status(base) ~= "suspended" or - type(top) ~= "thread" or _co.status(top) ~= "suspended" then - return false - end - - local status, result = pcall(function(...) - local _result = table.pack(pco.resume(top, ...)) - return _result - end,...) - - if not status then - return nil, result - end - - return table.unpack(result) - end - function pco.resume(thread, ...) - checkArg(1, thread, "thread") - local status = pco.status(thread) - if status ~= "suspended" then - local msg = string.format("cannot resume %s coroutine", - status == "dead" and "dead" or "non-suspended") - return false, msg - end - - local current_index = pco.index_of(pco.running()) - local existing_index = pco.index_of(thread) - - if not existing_index then - assert(current_index, "pco coroutines cannot resume threads outside the stack") - pco.stack = tx.concat(tx.sub(pco.stack, 1, current_index), {thread}) - end - - if current_index then - -- current should be waiting for yield - pco.next = thread - local t = table.pack(_co.yield(...)) -- pass args to resume next - return pco.last == nil and true or pco.last, table.unpack(t,1,t.n) - else - -- the stack is not running - pco.next = false - local yield_args = table.pack(_co.resume(thread, ...)) - if #pco.stack > 0 then - -- thread may have crashed (crash unwinds as well) - -- or we don't have next lined up (unwind) - if not pco.next or not yield_args[1] then - -- unwind from current index, not top - pco.set_unwind(thread) - end - - -- if next is current thread, yield_all is active - -- in such a case, yield out first, then resume where we left off - if pco.next and pco.next ~= thread then - local next = pco.next - pco.next = false - pco.last = yield_args[1] - return pco.resume(next, table.unpack(yield_args,2,yield_args.n)) - end - end - - return table.unpack(yield_args) - end - end - function pco.status(thread) - checkArg(1, thread, "thread") - - local current_index = pco.index_of(pco.running()) - local existing_index = pco.index_of(thread) - - if current_index and existing_index and existing_index < current_index then - local current = pco.stack[current_index] - if current and _co.status(current) == "running" then - return "normal" - end - end - - return _co.status(thread) - end - - if fp then - pco.stack = {process.load(fp,nil,init,name or "pco root")} - process.info(pco.stack[1]).data.coroutine_handler = pco - end - - return pco -end - -local pipeManager = {} -function pipeManager.reader(pm,...) - while pm.pco.status(pm.threads[pm.prog_id]) ~= "dead" do - pm.pco.yield_all() - - -- kick back to main thread, true to kick back one further - if pm.closed then break end - - -- if we are a reader pipe, we leave the buffer alone and yield to previous - if pm.pco.status(pm.threads[pm.prog_id]) ~= "dead" then - pm.pco.yield(...) - end - end - pm.dead = true -end - -function pipeManager:buffer(value) - -- if value but no stream, buffer for buffer - - local s = self and self.pipe and self.pipe.stream - if not s then - if type(value) == "string" or self.prewrite then - self.prewrite = self.prewrite or {} - s = self.prewrite -- s.buffer will be self.prewrite.buffer - else - return '' - end - elseif self.prewrite then -- we stored, previously, a prewrite buffer - s.buffer = self.prewrite.buffer .. s.buffer - self.prewrite = nil - end - - if type(value) == "string" then - s.buffer = value - return value - elseif type(value) ~= "number" then - return s.buffer -- don't truncate - end - - local result = string.sub(s.buffer, 1, value) - s.buffer = string.sub(s.buffer, value + 1) - return result -end - -function pipeManager:redirectRead() - local reader = {pm=self} - function reader:read(n) - local pm = self.pm - local pco = pm.pco - -- if we have any buffer, return it first - - if pm:buffer() == '' and not pm.closed and not pm.dead then - pco.yield_all() - end - - if pm.closed or pm.dead then - return nil - end - - return pm:buffer(n) - end - - return reader -end - -function pipes.createPipe(prog, mode, env) - mode = mode or "r" - if mode ~= "r" and mode ~= "w" then - return nil, "bad argument #2: invalid mode " .. tostring(mode) .. " must be r or w" - end - - local shellPath, reason = shell.resolve(os.getenv("SHELL") or "/bin/sh", "lua") - if not shellPath then - return nil, reason - end - - local pm = setmetatable( - {dead=false,closed=false,prog=prog,mode=mode,env=env}, - {__index=pipeManager} - ) - pm.prog_id = pm.mode == "r" and 1 or 2 - pm.self_id = pm.mode == "r" and 2 or 1 - pm.handler = pm.mode == "r" and - function()return pipeManager.reader(pm)end or - function()pm.dead=true end - - pm.commands = {} - pm.commands[pm.prog_id] = {shellPath, {}} - pm.commands[pm.self_id] = {pm.handler, {}} - - pm.root = function() - local reason - pm.threads, reason = sh.internal.createThreads(pm.commands, pm.env, {[pm.prog_id]=table.pack(pm.env,pm.prog)}) - - if not pm.threads then - pm.dead = true - return false, reason - end - - pm.pipe = process.info(pm.threads[1]).data.io[1] - - -- if we are the writer, we need args to resume prog - if pm.mode == "w" then - pm.pipe.stream.redirect[0] = pm:redirectRead() - end - - return sh.internal.runThreads(pm.threads) - end - - pm.pco = pipes.createCoroutineStack(pm.root) - return pipeStream.new(pm) -end - -function pipes.popen(prog, mode, env) - checkArg(1, prog, "string") - checkArg(2, mode, "string", "nil") - checkArg(3, env, "table", "nil") - - local pipe, reason = pipes.createPipe(prog, mode, env) - - if not pipe then - return false, reason - end - - -- pipe file descriptor - local pfd = require("buffer").new(mode, pipe) - pfd:setvbuf("no", 0) -- 2nd are to read chunk size - - -- popen processes start on create (like a thread) - pfd.stream:resume() - - return pfd -end - -return pipes diff --git a/src/main/resources/assets/opencomputers/loot/openos/lib/process.lua b/src/main/resources/assets/opencomputers/loot/openos/lib/process.lua index 8b064097f..169fd73bf 100644 --- a/src/main/resources/assets/opencomputers/loot/openos/lib/process.lua +++ b/src/main/resources/assets/opencomputers/loot/openos/lib/process.lua @@ -104,14 +104,8 @@ function process.load(path, env, init, name) return thread end -function process.running(level) -- kept for backwards compat, prefer process.info - local info = process.info(level) - if info then - return info.path, info.env, info.command - end -end - function process.info(levelOrThread) + checkArg(1, levelOrThread, "thread", "number", "nil") local p if type(levelOrThread) == "thread" then p = process.findProcess(levelOrThread) @@ -141,4 +135,19 @@ function process.internal.close(thread, result) process.list[thread] = nil end +function process.internal.continue(co, ...) + local result = {} + -- Emulate CC behavior by making yields a filtered event.pull() + local args = table.pack(...) + while coroutine.status(co) ~= "dead" do + result = table.pack(coroutine.resume(co, table.unpack(args, 1, args.n))) + if coroutine.status(co) ~= "dead" then + args = table.pack(coroutine.yield(table.unpack(result, 2, result.n))) + elseif not result[1] then + io.stderr:write(result[2]) + end + end + return table.unpack(result, 2, result.n) +end + return process diff --git a/src/main/resources/assets/opencomputers/loot/openos/lib/sh.lua b/src/main/resources/assets/opencomputers/loot/openos/lib/sh.lua index 84cc77715..7ff6e33d4 100644 --- a/src/main/resources/assets/opencomputers/loot/openos/lib/sh.lua +++ b/src/main/resources/assets/opencomputers/loot/openos/lib/sh.lua @@ -180,29 +180,12 @@ function sh.internal.createThreads(commands, env, start_args) end if #threads > 1 then - sh.internal.buildPipeChain(threads) + require("pipe").buildPipeChain(threads) end return threads end -function sh.internal.runThreads(threads) - local result = {} - for i = 1, #threads do - -- Emulate CC behavior by making yields a filtered event.pull() - local thread, args = threads[i], {} - while coroutine.status(thread) ~= "dead" do - result = table.pack(coroutine.resume(thread, table.unpack(args))) - if coroutine.status(thread) ~= "dead" then - args = table.pack(coroutine.yield(table.unpack(result, 2, result.n))) - elseif not result[1] then - io.stderr:write(result[2]) - end - end - end - return result[2] -end - function sh.internal.executePipes(pipe_parts, eargs, env) local commands = {} for _,words in ipairs(pipe_parts) do @@ -260,7 +243,7 @@ function sh.internal.executePipes(pipe_parts, eargs, env) local threads, reason = sh.internal.createThreads(commands, env, {[#commands]=eargs}) if not threads then return false, reason end - return sh.internal.runThreads(threads) + return process.internal.continue(threads[1]) end function sh.execute(env, command, ...) diff --git a/src/main/resources/assets/opencomputers/loot/openos/lib/shell.lua b/src/main/resources/assets/opencomputers/loot/openos/lib/shell.lua index feac9acff..ea536a71a 100644 --- a/src/main/resources/assets/opencomputers/loot/openos/lib/shell.lua +++ b/src/main/resources/assets/opencomputers/loot/openos/lib/shell.lua @@ -124,16 +124,9 @@ function shell.execute(command, env, ...) if not sh then return false, reason end - local result = table.pack(coroutine.resume(process.load(function(...) - return sh(...) - end), env, command, ...)) - if not result[1] and type(result[2]) == "table" and result[2].reason == "terminated" then - if result[2].code then - return true - else - return false, "terminated" - end - end + local proc = process.load(sh, nil, nil, command) + local result = table.pack(process.internal.continue(proc, env, command, ...)) + if result.n == 0 then return true end return table.unpack(result, 1, result.n) end diff --git a/src/main/resources/assets/opencomputers/loot/openos/lib/thread.lua b/src/main/resources/assets/opencomputers/loot/openos/lib/thread.lua index f3234a6af..f602e7b77 100644 --- a/src/main/resources/assets/opencomputers/loot/openos/lib/thread.lua +++ b/src/main/resources/assets/opencomputers/loot/openos/lib/thread.lua @@ -1,49 +1,10 @@ -local pipes = require("pipes") +local pipe = require("pipe") local event = require("event") local process = require("process") local computer = require("computer") local thread = {} - -do - local handlers = event.handlers - local handlers_mt = getmetatable(handlers) - -- the event library sets a metatable on handlers, but we set threaded=true - if not handlers_mt.threaded then - -- find the root process - local root_data - for _,p in pairs(process.list) do - if not p.parent then - root_data = p.data - break - end - end - assert(root_data, "thread library panic: no root proc") - handlers_mt.threaded = true - -- if we don't separate root handlers from thread handlers we see double dispatch - -- because the thread calls dispatch on pull as well - root_data.handlers = {} -- root handlers - root_data.pull = handlers_mt.__call -- the real computer.pullSignal - while true do - local key, value = next(handlers) - if not key then break end - root_data.handlers[key] = value - handlers[key] = nil - end - handlers_mt.__index = function(_, key) - return process.info().data.handlers[key] - end - handlers_mt.__newindex = function(_, key, value) - process.info().data.handlers[key] = value - end - handlers_mt.__pairs = function(_, ...) - return pairs(process.info().data.handlers, ...) - end - handlers_mt.__call = function(tbl, ...) - return process.info().data.pull(tbl, ...) - end - end -end +local init_thread local function waitForDeath(threads, timeout, all) checkArg(1, threads, "table") @@ -57,7 +18,8 @@ local function waitForDeath(threads, timeout, all) local dieing = {} local living = false for _,t in ipairs(threads) do - local result = t.process and t.process.data.result + local mt = getmetatable(t) + local result = mt.attached.data.result local proc_ok = type(result) ~= "table" or result[1] local ready_to_die = t:status() ~= "running" -- suspended is considered dead to exit or not proc_ok -- the thread is killed if its attached process has a non zero exit @@ -133,57 +95,62 @@ function box_thread:status() end function box_thread:join(timeout) - return box_thread_handle.close({self}, timeout) + return waitForDeath({self}, timeout, true) end function box_thread:kill() - self:detach() - if self:status() == "dead" then - return - end - getmetatable(self).__status = "dead" - self.pco.stack = {} + getmetatable(self).close() end function box_thread:detach() - if not self.process then - return - end - local handles = self.process.data.handles - local btHandle = get_box_thread_handle(handles) - if not btHandle then - return nil, "thread failed to detach, process had no thread handle" - end - for i,h in ipairs(btHandle) do - if h == self then - self.process = nil - return table.remove(btHandle, i) - end - end - return nil, "thread not found in parent process" + return self:attach(init_thread) end function box_thread:attach(parent) checkArg(1, parent, "thread", "number", "nil") + local mt = assert(getmetatable(self), "thread panic: no metadata") local proc = process.info(parent) if not proc then return nil, "thread failed to attach, process not found" end - self:detach() + if mt.attached == proc then return self end -- already attached + + local waiting_handler + if mt.attached then + local prev_btHandle = assert(get_box_thread_handle(mt.attached.data.handles), "thread panic: no thread handle") + for i,h in ipairs(prev_btHandle) do + if h == self then + table.remove(prev_btHandle, i) + if mt.id then + waiting_handler = assert(mt.attached.data.handlers[mt.id], "thread panic: no event handler") + mt.attached.data.handlers[mt.id] = nil + end + break + end + end + end + -- attach to parent or the current process - self.process = proc - local handles = self.process.data.handles + mt.attached = proc + local handles = proc.data.handles -- this process may not have a box_thread manager handle local btHandle = get_box_thread_handle(handles, true) table.insert(btHandle, self) - return true + + if waiting_handler then -- event-waiting + mt.register(waiting_handler.timeout - computer.uptime()) + end + + return self end function thread.create(fp, ...) checkArg(1, fp, "function") - local t = setmetatable({}, {__status="suspended",__index=box_thread}) - t.pco = pipes.createCoroutineStack(function(...) - getmetatable(t).__status = "running" + local t = {} + local mt = {__status="suspended",__index=box_thread} + setmetatable(t, mt) + t.pco = pipe.createCoroutineStack(function(...) + mt.__status = "running" local fp_co = t.pco.create(fp) -- run fp_co until dead -- pullSignal will yield_all past this point @@ -217,34 +184,35 @@ function thread.create(fp, ...) end args = table.pack(event.pull(table.unpack(result, 2, result.n))) end - end) + end, nil, "thread") --special resume to keep track of process death - local function private_resume(...) - local result = table.pack(t.pco.resume_all(...)) - if #t.pco.stack == 0 then - t:detach() - local mt = getmetatable(t) - mt.__status = "dead" - event.push("thread_exit") + function mt.private_resume(...) + mt.id = nil + -- this thread may have been killed + if t:status() == "dead" then return end + local result = table.pack(t.pco.resume(t.pco.root, ...)) + if t.pco.status(t.pco.root) == "dead" then + mt.close() end return table.unpack(result, 1, result.n) end - local data = process.info(t.pco.stack[1]).data - data.handlers = {} - data.pull = function(_, timeout) - -- register a timeout handler - -- hack so that event.register sees the root handlers - local data_handlers = data.handlers - data.handlers = process.info(2).data.handlers - event.register( - nil, -- nil key matches anything, timers use false keys - private_resume, - timeout, -- wait for the time specified by the caller - 1) -- we only want this thread to wake up once - data.handlers = data_handlers + mt.process = process.list[t.pco.root] + mt.process.data.handlers = {} + function mt.register(timeout) + -- register a timeout handler + mt.id = event.register( + nil, -- nil key matches anything, timers use false keys + mt.private_resume, + timeout, -- wait for the time specified by the caller + 1, -- we only want this thread to wake up once + mt.attached.data.handlers) -- optional arg, to specify our own handlers + end + + function mt.process.data.pull(_, timeout) + mt.register(timeout) -- yield_all will yield this pco stack -- the callback will resume this stack local event_data @@ -255,10 +223,68 @@ function thread.create(fp, ...) return table.unpack(event_data, 1, event_data.n) end - t:attach() - private_resume(...) -- threads start out running + function mt.close() + if t:status() == "dead" then + return + end + local htm = get_box_thread_handle(mt.attached.data.handles) + for _,ht in ipairs(htm) do + if ht == t then + table.remove(htm, _) + break + end + end + mt.__status = "dead" + event.push("thread_exit") + end + + t:attach() -- the current process + mt.private_resume(...) -- threads start out running return t end +do + local handlers = event.handlers + local handlers_mt = getmetatable(handlers) + -- the event library sets a metatable on handlers, but we set threaded=true + if not handlers_mt.threaded then + -- find the root process + local root_data + for t,p in pairs(process.list) do + if not p.parent then + init_thread = t + root_data = p.data + break + end + end + assert(init_thread, "thread library panic: no init thread") + handlers_mt.threaded = true + -- handles might be optimized out for memory + root_data.handles = root_data.handles or {} + -- if we don't separate root handlers from thread handlers we see double dispatch + -- because the thread calls dispatch on pull as well + root_data.handlers = {} -- root handlers + root_data.pull = handlers_mt.__call -- the real computer.pullSignal + while true do + local key, value = next(handlers) + if not key then break end + root_data.handlers[key] = value + handlers[key] = nil + end + handlers_mt.__index = function(_, key) + return process.info().data.handlers[key] + end + handlers_mt.__newindex = function(_, key, value) + process.info().data.handlers[key] = value + end + handlers_mt.__pairs = function(_, ...) + return pairs(process.info().data.handlers, ...) + end + handlers_mt.__call = function(tbl, ...) + return process.info().data.pull(tbl, ...) + end + end +end + return thread diff --git a/src/main/resources/assets/opencomputers/loot/openos/opt/core/boot.lua b/src/main/resources/assets/opencomputers/loot/openos/opt/core/boot.lua index aeecf5f43..7ad88d429 100644 --- a/src/main/resources/assets/opencomputers/loot/openos/opt/core/boot.lua +++ b/src/main/resources/assets/opencomputers/loot/openos/opt/core/boot.lua @@ -1,7 +1,7 @@ -- called from /init.lua local raw_loadfile = ... -_G._OSVERSION = "OpenOS 1.6.4" +_G._OSVERSION = "OpenOS 1.6.5" local component = component local computer = computer @@ -108,7 +108,6 @@ status("Initializing file system...") -- Mount the ROM and temporary file systems to allow working on the file -- system module from this point on. require("filesystem").mount(computer.getBootAddress(), "/") -package.preload={} status("Running boot scripts...") diff --git a/src/main/resources/assets/opencomputers/loot/openos/opt/core/full_sh.lua b/src/main/resources/assets/opencomputers/loot/openos/opt/core/full_sh.lua index cb7243325..5f29fb565 100644 --- a/src/main/resources/assets/opencomputers/loot/openos/opt/core/full_sh.lua +++ b/src/main/resources/assets/opencomputers/loot/openos/opt/core/full_sh.lua @@ -99,36 +99,6 @@ function sh.internal.openCommandRedirects(redirects) end end -function sh.internal.buildPipeChain(threads) - local prev_pipe - for i=1,#threads do - local thread = threads[i] - local data = process.info(thread).data - local pio = data.io - - local pipe - if i < #threads then - pipe = require("buffer").new("rw", sh.internal.newMemoryStream()) - pipe:setvbuf("no", 0) - -- buffer close flushes the buffer, but we have no buffer - -- also, when the buffer is closed, read and writes don't pass through - -- simply put, we don't want buffer:close - pipe.close = function(self) self.stream:close() end - pipe.stream.redirect[1] = rawget(pio, 1) - pio[1] = pipe - table.insert(data.handles, pipe) - end - - if prev_pipe then - prev_pipe.stream.redirect[0] = rawget(pio, 0) - prev_pipe.stream.next = thread - pio[0] = prev_pipe - end - - prev_pipe = pipe - end -end - -- takes an eword, returns a list of glob hits or {word} if no globs exist function sh.internal.glob(eword) -- words are parts, parts are txt and qr @@ -524,67 +494,6 @@ function sh.internal.remove_negation(chain) return false end -function sh.internal.newMemoryStream() - local memoryStream = {} - - function memoryStream:close() - self.closed = true - self.redirect = {} - end - - function memoryStream:seek() - return nil, "bad file descriptor" - end - - function memoryStream:read(n) - if self.closed then - return nil -- eof - end - if self.redirect[0] then - -- popen could be using this code path - -- if that is the case, it is important to leave stream.buffer alone - return self.redirect[0]:read(n) - elseif self.buffer == "" then - coroutine.yield() - end - local result = string.sub(self.buffer, 1, n) - self.buffer = string.sub(self.buffer, n + 1) - return result - end - - function memoryStream:write(value) - if not self.redirect[1] and self.closed then - -- if next is dead, ignore all writes - if coroutine.status(self.next) ~= "dead" then - io.stderr:write("attempt to use a closed stream\n") - os.exit(1) - end - elseif self.redirect[1] then - return self.redirect[1]:write(value) - elseif not self.closed then - self.buffer = self.buffer .. value - local result = table.pack(coroutine.resume(self.next)) - if coroutine.status(self.next) == "dead" then - self:close() - -- always cause os.exit when the pipe closes - result[1] = nil - end - -- the next pipe - if not result[1] then - os.exit(sh.internal.command_result_as_code(result[2])) - end - return self - end - os.exit(0) -- abort the current process: SIGPIPE - end - - local stream = {closed = false, buffer = "", - redirect = {}, result = {}} - local metatable = {__index = memoryStream, - __metatable = "memorystream"} - return setmetatable(stream, metatable) -end - function sh.internal.execute_complex(statements, eargs, env) for si=1,#statements do local s = statements[si] local chains = sh.internal.groupChains(s)