picoev: extract common code to a trace_fd/1 function, cleanup

This commit is contained in:
Delyan Angelov 2024-10-29 18:32:31 +02:00
parent 3939737a8d
commit 6e9a66dbf3
No known key found for this signature in database
GPG Key ID: 66886C0F12D595ED
4 changed files with 22 additions and 83 deletions

View File

@ -4,3 +4,8 @@ module picoev
fn elog(msg string) { fn elog(msg string) {
eprintln(msg) eprintln(msg)
} }
@[if trace_fd ?]
fn trace_fd(msg string) {
eprintln(msg)
}

View File

@ -83,18 +83,13 @@ fn (mut pv Picoev) poll_once(max_wait_in_sec int) int {
if C.FD_ISSET(target.fd, &writefds) != 0 { if C.FD_ISSET(target.fd, &writefds) != 0 {
read_events |= picoev_write read_events |= picoev_write
} }
if read_events != 0 { if read_events != 0 {
$if trace_fd ? { trace_fd('do callback ${target.fd}')
eprintln('do callback ${target.fd}')
}
// do callback! // do callback!
unsafe { target.cb(target.fd, read_events, &pv) } unsafe { target.cb(target.fd, read_events, &pv) }
} }
} }
} }
} }
return 0 return 0
} }

View File

@ -88,10 +88,8 @@ pub:
// init fills the `file_descriptors` array // init fills the `file_descriptors` array
pub fn (mut pv Picoev) init() { pub fn (mut pv Picoev) init() {
assert max_fds > 0 // assert max_fds > 0
pv.num_loops = 0 pv.num_loops = 0
for i in 0 .. max_fds { for i in 0 .. max_fds {
pv.file_descriptors[i] = &Target{} pv.file_descriptors[i] = &Target{}
} }
@ -103,21 +101,17 @@ pub fn (mut pv Picoev) add(fd int, events int, timeout int, callback voidptr) in
if pv == unsafe { nil } || fd < 0 || fd >= max_fds { if pv == unsafe { nil } || fd < 0 || fd >= max_fds {
return -1 // Invalid arguments return -1 // Invalid arguments
} }
mut target := pv.file_descriptors[fd] mut target := pv.file_descriptors[fd]
target.fd = fd target.fd = fd
target.cb = callback target.cb = callback
target.loop_id = pv.loop.id target.loop_id = pv.loop.id
target.events = 0 target.events = 0
if pv.update_events(fd, events | picoev_add) != 0 { if pv.update_events(fd, events | picoev_add) != 0 {
if pv.delete(fd) != 0 { if pv.delete(fd) != 0 {
elog('Error during del') elog('Error during del')
} }
return -1 return -1
} }
pv.set_timeout(fd, timeout) pv.set_timeout(fd, timeout)
return 0 return 0
} }
@ -135,18 +129,12 @@ pub fn (mut pv Picoev) delete(fd int) int {
if fd < 0 || fd >= max_fds { if fd < 0 || fd >= max_fds {
return -1 // Invalid fd return -1 // Invalid fd
} }
mut target := pv.file_descriptors[fd] mut target := pv.file_descriptors[fd]
trace_fd('remove ${fd}')
$if trace_fd ? {
eprintln('remove ${fd}')
}
if pv.update_events(fd, picoev_del) != 0 { if pv.update_events(fd, picoev_del) != 0 {
elog('Error during update_events. event: `picoev.picoev_del`') elog('Error during update_events. event: `picoev.picoev_del`')
return -1 return -1
} }
pv.set_timeout(fd, 0) pv.set_timeout(fd, 0)
target.loop_id = -1 target.loop_id = -1
target.fd = 0 target.fd = 0
@ -156,19 +144,16 @@ pub fn (mut pv Picoev) delete(fd int) int {
fn (mut pv Picoev) loop_once(max_wait_in_sec int) int { fn (mut pv Picoev) loop_once(max_wait_in_sec int) int {
pv.loop.now = get_time() pv.loop.now = get_time()
if pv.poll_once(max_wait_in_sec) != 0 { if pv.poll_once(max_wait_in_sec) != 0 {
elog('Error during poll_once') elog('Error during poll_once')
return -1 return -1
} }
if max_wait_in_sec == 0 {
if max_wait_in_sec != 0 {
pv.loop.now = get_time() // Update loop start time again if waiting occurred
} else {
// If no waiting, skip timeout handling for potential performance optimization // If no waiting, skip timeout handling for potential performance optimization
return 0 return 0
} }
// Update loop start time again if waiting occurred
pv.loop.now = get_time()
pv.handle_timeout() pv.handle_timeout()
return 0 return 0
} }
@ -178,10 +163,10 @@ fn (mut pv Picoev) loop_once(max_wait_in_sec int) int {
@[direct_array_access; inline] @[direct_array_access; inline]
fn (mut pv Picoev) set_timeout(fd int, secs int) { fn (mut pv Picoev) set_timeout(fd int, secs int) {
assert fd < max_fds assert fd < max_fds
if secs != 0 { if secs == 0 {
pv.timeouts[fd] = pv.loop.now + secs
} else {
pv.timeouts.delete(fd) pv.timeouts.delete(fd)
} else {
pv.timeouts[fd] = pv.loop.now + secs
} }
} }
@ -191,13 +176,11 @@ fn (mut pv Picoev) set_timeout(fd int, secs int) {
@[direct_array_access; inline] @[direct_array_access; inline]
fn (mut pv Picoev) handle_timeout() { fn (mut pv Picoev) handle_timeout() {
mut to_remove := []int{} mut to_remove := []int{}
for fd, timeout in pv.timeouts { for fd, timeout in pv.timeouts {
if timeout <= pv.loop.now { if timeout <= pv.loop.now {
to_remove << fd to_remove << fd
} }
} }
for fd in to_remove { for fd in to_remove {
target := pv.file_descriptors[fd] target := pv.file_descriptors[fd]
assert target.loop_id == pv.loop.id assert target.loop_id == pv.loop.id
@ -210,26 +193,19 @@ fn (mut pv Picoev) handle_timeout() {
fn accept_callback(listen_fd int, events int, cb_arg voidptr) { fn accept_callback(listen_fd int, events int, cb_arg voidptr) {
mut pv := unsafe { &Picoev(cb_arg) } mut pv := unsafe { &Picoev(cb_arg) }
accepted_fd := accept(listen_fd) accepted_fd := accept(listen_fd)
if accepted_fd == -1 { if accepted_fd == -1 {
if fatal_socket_error(accepted_fd) == false { if fatal_socket_error(accepted_fd) == false {
return return
} }
elog('Error during accept') elog('Error during accept')
return return
} }
if accepted_fd >= max_fds { if accepted_fd >= max_fds {
// should never happen // should never happen
close_socket(accepted_fd) close_socket(accepted_fd)
return return
} }
trace_fd('accept ${accepted_fd}')
$if trace_fd ? {
eprintln('accept ${accepted_fd}')
}
setup_sock(accepted_fd) or { setup_sock(accepted_fd) or {
elog('setup_sock failed, fd: ${accepted_fd}, listen_fd: ${listen_fd}, err: ${err.code()}') elog('setup_sock failed, fd: ${accepted_fd}, listen_fd: ${listen_fd}, err: ${err.code()}')
pv.error_callback(pv.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{}, pv.error_callback(pv.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{},
@ -256,17 +232,12 @@ fn raw_callback(fd int, events int, context voidptr) {
defer { defer {
pv.idx[fd] = 0 pv.idx[fd] = 0
} }
if events & picoev_timeout != 0 { if events & picoev_timeout != 0 {
$if trace_fd ? { trace_fd('timeout ${fd}')
eprintln('timeout ${fd}')
}
if !isnil(pv.raw_callback) { if !isnil(pv.raw_callback) {
pv.raw_callback(mut pv, fd, events) pv.raw_callback(mut pv, fd, events)
return return
} }
pv.close_conn(fd) pv.close_conn(fd)
return return
} else if events & picoev_read != 0 { } else if events & picoev_read != 0 {
@ -275,13 +246,11 @@ fn raw_callback(fd int, events int, context voidptr) {
pv.raw_callback(mut pv, fd, events) pv.raw_callback(mut pv, fd, events)
return return
} }
mut request_buffer := pv.buf mut request_buffer := pv.buf
unsafe { unsafe {
request_buffer += fd * pv.max_read // pointer magic request_buffer += fd * pv.max_read // pointer magic
} }
mut req := picohttpparser.Request{} mut req := picohttpparser.Request{}
// Response init // Response init
mut response_buffer := pv.out mut response_buffer := pv.out
unsafe { unsafe {
@ -293,7 +262,6 @@ fn raw_callback(fd int, events int, context voidptr) {
buf: response_buffer buf: response_buffer
date: pv.date.str date: pv.date.str
} }
for { for {
// Request parsing loop // Request parsing loop
r := req_read(fd, request_buffer, pv.max_read, pv.idx[fd]) // Get data from socket r := req_read(fd, request_buffer, pv.max_read, pv.idx[fd]) // Get data from socket
@ -305,15 +273,12 @@ fn raw_callback(fd int, events int, context voidptr) {
if fatal_socket_error(fd) == false { if fatal_socket_error(fd) == false {
return return
} }
elog('Error during req_read') elog('Error during req_read')
// fatal error // fatal error
pv.close_conn(fd) pv.close_conn(fd)
return return
} }
pv.idx[fd] += r pv.idx[fd] += r
mut s := unsafe { tos(request_buffer, pv.idx[fd]) } mut s := unsafe { tos(request_buffer, pv.idx[fd]) }
pret := req.parse_request(s) or { pret := req.parse_request(s) or {
// Parse error // Parse error
@ -323,7 +288,6 @@ fn raw_callback(fd int, events int, context voidptr) {
if pret > 0 { // Success if pret > 0 { // Success
break break
} }
assert pret == -2 assert pret == -2
// request is incomplete, continue the loop // request is incomplete, continue the loop
if pv.idx[fd] == sizeof(request_buffer) { if pv.idx[fd] == sizeof(request_buffer) {
@ -331,7 +295,6 @@ fn raw_callback(fd int, events int, context voidptr) {
return return
} }
} }
// Callback (should call .end() itself) // Callback (should call .end() itself)
pv.cb(pv.user_data, req, mut &res) pv.cb(pv.user_data, req, mut &res)
} else if events & picoev_write != 0 { } else if events & picoev_write != 0 {
@ -354,7 +317,6 @@ pub fn new(config Config) !&Picoev {
elog('Error during listen: ${err}') elog('Error during listen: ${err}')
return err return err
} }
mut pv := &Picoev{ mut pv := &Picoev{
num_loops: 1 num_loops: 1
cb: config.cb cb: config.cb
@ -366,12 +328,10 @@ pub fn new(config Config) !&Picoev {
max_read: config.max_read max_read: config.max_read
max_write: config.max_write max_write: config.max_write
} }
if isnil(pv.raw_callback) { if isnil(pv.raw_callback) {
pv.buf = unsafe { malloc_noscan(max_fds * config.max_read + 1) } pv.buf = unsafe { malloc_noscan(max_fds * config.max_read + 1) }
pv.out = unsafe { malloc_noscan(max_fds * config.max_write + 1) } pv.out = unsafe { malloc_noscan(max_fds * config.max_write + 1) }
} }
// epoll on linux // epoll on linux
// kqueue on macos and bsd // kqueue on macos and bsd
// select on windows and others // select on windows and others
@ -382,15 +342,12 @@ pub fn new(config Config) !&Picoev {
} $else { } $else {
pv.loop = create_select_loop(0) or { panic(err) } pv.loop = create_select_loop(0) or { panic(err) }
} }
if pv.loop == unsafe { nil } { if pv.loop == unsafe { nil } {
elog('Failed to create loop') elog('Failed to create loop')
close_socket(listening_socket_fd) close_socket(listening_socket_fd)
return unsafe { nil } return unsafe { nil }
} }
pv.init() pv.init()
pv.add(listening_socket_fd, picoev_read, 0, accept_callback) pv.add(listening_socket_fd, picoev_read, 0, accept_callback)
return pv return pv
} }
@ -399,7 +356,6 @@ pub fn new(config Config) !&Picoev {
// See also picoev.new(). // See also picoev.new().
pub fn (mut pv Picoev) serve() { pub fn (mut pv Picoev) serve() {
spawn update_date_string(mut pv) spawn update_date_string(mut pv)
for { for {
pv.loop_once(1) pv.loop_once(1)
} }

View File

@ -30,10 +30,7 @@ fn accept(fd int) int {
@[inline] @[inline]
fn close_socket(fd int) { fn close_socket(fd int) {
$if trace_fd ? { trace_fd('close ${fd}')
eprintln('close ${fd}')
}
$if windows { $if windows {
C.closesocket(fd) C.closesocket(fd)
} $else { } $else {
@ -44,11 +41,9 @@ fn close_socket(fd int) {
@[inline] @[inline]
fn setup_sock(fd int) ! { fn setup_sock(fd int) ! {
flag := 1 flag := 1
if C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_NODELAY, &flag, sizeof(int)) < 0 { if C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_NODELAY, &flag, sizeof(int)) < 0 {
return error('setup_sock.setup_sock failed') return error('setup_sock.setup_sock failed')
} }
$if freebsd { $if freebsd {
if C.fcntl(fd, C.F_SETFL, C.SOCK_NONBLOCK) != 0 { if C.fcntl(fd, C.F_SETFL, C.SOCK_NONBLOCK) != 0 {
return error('fcntl failed') return error('fcntl failed')
@ -88,11 +83,7 @@ fn fatal_socket_error(fd int) bool {
return false return false
} }
} }
trace_fd('fatal error ${fd}: ${C.errno}')
$if trace_fd ? {
eprintln('fatal error ${fd}: ${C.errno}')
}
return true return true
} }
@ -103,22 +94,16 @@ fn listen(config Config) !int {
if fd == -1 { if fd == -1 {
return error('Failed to create socket') return error('Failed to create socket')
} }
trace_fd('listen: ${fd}')
$if trace_fd ? {
eprintln('listen: ${fd}')
}
// Setting flags for socket // Setting flags for socket
flag := 1 flag := 1
flag_zero := 0 flag_zero := 0
net.socket_error(C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEADDR, &flag, sizeof(int)))! net.socket_error(C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEADDR, &flag, sizeof(int)))!
if config.family == .ip6 { if config.family == .ip6 {
// set socket to dualstack so connections to both ipv4 and ipv6 addresses // set socket to dualstack so connections to both ipv4 and ipv6 addresses
// can be accepted // can be accepted
net.socket_error(C.setsockopt(fd, C.IPPROTO_IPV6, C.IPV6_V6ONLY, &flag_zero, sizeof(int)))! net.socket_error(C.setsockopt(fd, C.IPPROTO_IPV6, C.IPV6_V6ONLY, &flag_zero, sizeof(int)))!
} }
$if linux { $if linux {
// epoll socket options // epoll socket options
net.socket_error(C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEPORT, &flag, sizeof(int)))! net.socket_error(C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEPORT, &flag, sizeof(int)))!
@ -131,20 +116,18 @@ fn listen(config Config) !int {
sizeof(int)))! sizeof(int)))!
} }
} }
// addr settings // addr settings
saddr := '${config.host}:${config.port}' saddr := '${config.host}:${config.port}'
addrs := net.resolve_addrs(saddr, config.family, .tcp) or { panic(err) } addrs := net.resolve_addrs(saddr, config.family, .tcp) or {
panic('Error while resolving `${saddr}`, err: ${err}')
}
addr := addrs[0] addr := addrs[0]
alen := addr.len() alen := addr.len()
net.socket_error_message(C.bind(fd, voidptr(&addr), alen), 'binding to ${saddr} failed')! net.socket_error_message(C.bind(fd, voidptr(&addr), alen), 'binding to ${saddr} failed')!
net.socket_error_message(C.listen(fd, C.SOMAXCONN), 'listening on ${saddr} with maximum backlog pending queue of ${C.SOMAXCONN}, failed')! net.socket_error_message(C.listen(fd, C.SOMAXCONN), 'listening on ${saddr} with maximum backlog pending queue of ${C.SOMAXCONN}, failed')!
setup_sock(fd) or { setup_sock(fd) or {
config.err_cb(config.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{}, config.err_cb(config.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{},
err) err)
} }
return fd return fd
} }