replacing yield_all with yield_past to control stack yielding

This change should allow popen, pipes, and threads to specify with exactness how many coroutines back they want to yield. This makes popen and threads immune to internal coroutine scenarios

Also using a code cleanup provided by @SDPhantom
This commit is contained in:
payonel 2017-06-22 16:46:52 -07:00
parent da059756b9
commit cdfdb277f6
3 changed files with 42 additions and 26 deletions

View File

@ -6,7 +6,11 @@ local _coroutine = coroutine -- real coroutine backend
_G.coroutine = setmetatable( _G.coroutine = setmetatable(
{ {
resume = function(co, ...) resume = function(co, ...)
return assert(process.info(co), "thread has no proc").data.coroutine_handler.resume(co, ...) local proc = process.info(co)
-- proc is nil if the process closed, natural resume will likely complain the coroutine is dead
-- but if proc is dead and an aborted coroutine is alive, it doesn't have any proc data like stack info
-- if the user really wants to resume it, let them
return (proc and proc.data.coroutine_handler.resume or _coroutine.resume)(co, ...)
end end
}, },
{ {
@ -51,10 +55,7 @@ end
_coroutine.wrap = function(f) _coroutine.wrap = function(f)
local thread = coroutine.create(f) local thread = coroutine.create(f)
return function(...) return function(...)
local result_pack = table.pack(coroutine.resume(thread, ...)) return select(2, coroutine.resume(thread, ...))
local result, reason = result_pack[1], result_pack[2]
assert(result, reason)
return select(2, table.unpack(result_pack))
end end
end end

View File

@ -22,17 +22,18 @@ function pipe.createCoroutineStack(root, env, name)
function pco.yield(...) function pco.yield(...)
return _root_co.yield(nil, ...) return _root_co.yield(nil, ...)
end end
function pco.yield_all(...) function pco.yield_past(co, ...)
return _root_co.yield(true, ...) return _root_co.yield(co, ...)
end end
function pco.resume(co, ...) function pco.resume(co, ...)
checkArg(1, co, "thread") checkArg(1, co, "thread")
local args = table.pack(...) local args = table.pack(...)
while true do -- for consecutive sysyields while true do -- for consecutive sysyields
local result = table.pack(_root_co.resume(co, table.unpack(args, 1, args.n))) local result = table.pack(_root_co.resume(co, table.unpack(args, 1, args.n)))
local target = result[2] == true and pco.root or result[2]
if not result[1] or _root_co.status(co) == "dead" then if not result[1] or _root_co.status(co) == "dead" then
return table.unpack(result, 1, result.n) return table.unpack(result, 1, result.n)
elseif result[2] and pco.root ~= co then elseif target and target ~= co then
args = table.pack(_root_co.yield(table.unpack(result, 2, result.n))) args = table.pack(_root_co.yield(table.unpack(result, 2, result.n)))
else else
return true, table.unpack(result, 3, result.n) return true, table.unpack(result, 3, result.n)
@ -70,7 +71,8 @@ local pipe_stream =
return self return self
end end
-- not reading, it is requesting a yield -- not reading, it is requesting a yield
result = table.pack(coroutine.yield_all(table.unpack(result, 2, result.n))) -- yield_past(true) will exit this coroutine stack
result = table.pack(coroutine.yield_past(true, table.unpack(result, 2, result.n)))
result = table.pack(coroutine.resume(self.next, table.unpack(result, 1, result.n))) -- the request was for an event result = table.pack(coroutine.resume(self.next, table.unpack(result, 1, result.n))) -- the request was for an event
end end
end, end,
@ -113,7 +115,7 @@ local pipe_stream =
-- natural yield (i.e. for events). To differentiate this yield from natural -- natural yield (i.e. for events). To differentiate this yield from natural
-- yields we set read_mode here, which the pipe_stream write detects -- yields we set read_mode here, which the pipe_stream write detects
self.read_mode = true self.read_mode = true
coroutine.yield_all() coroutine.yield_past(self.next) -- next is the first croutine in this stack
self.read_mode = false self.read_mode = false
end end
local result = string.sub(self.buffer, 1, n) local result = string.sub(self.buffer, 1, n)
@ -158,16 +160,24 @@ end
local chain_stream = local chain_stream =
{ {
read = function(self, value) read = function(self, value, ...)
if self.io_stream.closed then return nil end if self.io_stream.closed then return nil end
-- handler is currently on yield all [else we wouldn't have control here] -- wake up prog
local read_ok, ret = self.pco.resume(self.pco.root, value) self.ready = false -- the pipe proc sets this true when ios completes
-- ret can be non string when a process ends local ret = table.pack(coroutine.resume(self.pco.root, value, ...))
ret = type(ret) == "string" and ret or nil if coroutine.status(self.pco.root) == "dead" then
return select(read_ok and 2 or 1, nil, ret) return nil
elseif not ret[1] then
return table.unpack(ret, 1, ret.n)
end
if not self.ready then
-- prog yielded back without writing/reading
return self:read(coroutine.yield())
end
return ret[2]
end, end,
write = function(self, ...) write = function(self, ...)
return self:read(table.concat({...})) return self:read(...)
end, end,
close = function(self) close = function(self)
self.io_stream:close() self.io_stream:close()
@ -181,8 +191,8 @@ function pipe.popen(prog, mode, env)
end end
local r = mode == "r" local r = mode == "r"
local key = r and "read" or "write"
local chain = {}
-- to simplify the code - shell.execute is run within a function to pass (prog, env) -- to simplify the code - shell.execute is run within a function to pass (prog, env)
-- if cmd_proc where to come second (mode=="w") then the pipe_proc would have to pass -- if cmd_proc where to come second (mode=="w") then the pipe_proc would have to pass
-- the starting args. which is possible, just more complicated -- the starting args. which is possible, just more complicated
@ -194,22 +204,27 @@ function pipe.popen(prog, mode, env)
-- the stream needs its own process for io -- the stream needs its own process for io
local pipe_proc = process.load(function() local pipe_proc = process.load(function()
local n = r and 0 or "" local n = r and 0 or ""
local key = r and "read" or "write"
local ios = stream.io_stream local ios = stream.io_stream
while not ios.closed do while not ios.closed do
n = coroutine.yield_all(ios[key](ios, n)) -- read from pipe
local ret = table.pack(ios[key](ios, n))
stream.ready = true
-- yield outside the chain now
n = coroutine.yield_past(chain[1], table.unpack(ret, 1, ret.n))
end end
end, nil, nil, "pipe_handler") end, nil, nil, "pipe_handler")
local pipe_index = r and 2 or 1 chain[r and 1 or 2] = cmd_proc
local cmd_index = r and 1 or 2 chain[r and 2 or 1] = pipe_proc
local chain = {[cmd_index]=cmd_proc, [pipe_index]=pipe_proc}
-- link the cmd and pipe proc io -- link the cmd and pipe proc io
pipe.buildPipeChain(chain) pipe.buildPipeChain(chain)
local cmd_stack = process.info(chain[1]).data.coroutine_handler local cmd_data = process.info(chain[1]).data
local cmd_stack = cmd_data.coroutine_handler
-- store handle to io_stream from easy access later -- store handle to io_stream from easy access later
stream.io_stream = process.info(chain[1]).data.io[1].stream stream.io_stream = cmd_data.io[1].stream
stream.pco = cmd_stack stream.pco = cmd_stack
-- popen commands start out running, like threads -- popen commands start out running, like threads

View File

@ -213,11 +213,11 @@ function thread.create(fp, ...)
function mt.process.data.pull(_, timeout) function mt.process.data.pull(_, timeout)
mt.register(timeout) mt.register(timeout)
-- yield_all will yield this pco stack -- yield_past(root) will yield until out of this thread
-- the callback will resume this stack -- the callback will resume this stack
local event_data local event_data
repeat repeat
event_data = table.pack(t.pco.yield_all(timeout)) event_data = table.pack(t.pco.yield_past(t.pco.root, timeout))
-- during sleep, we may have been suspended -- during sleep, we may have been suspended
until t:status() ~= "suspended" until t:status() ~= "suspended"
return table.unpack(event_data, 1, event_data.n) return table.unpack(event_data, 1, event_data.n)