From 6e9a66dbf30ffea292381a5fbb5091707d0d8306 Mon Sep 17 00:00:00 2001 From: Delyan Angelov Date: Tue, 29 Oct 2024 18:32:31 +0200 Subject: [PATCH] picoev: extract common code to a trace_fd/1 function, cleanup --- vlib/picoev/{errors.v => logging.v} | 5 +++ vlib/picoev/loop_default.c.v | 7 +--- vlib/picoev/picoev.v | 64 +++++------------------------ vlib/picoev/socket_util.c.v | 29 +++---------- 4 files changed, 22 insertions(+), 83 deletions(-) rename vlib/picoev/{errors.v => logging.v} (57%) diff --git a/vlib/picoev/errors.v b/vlib/picoev/logging.v similarity index 57% rename from vlib/picoev/errors.v rename to vlib/picoev/logging.v index dae7c451ac..c86729d968 100644 --- a/vlib/picoev/errors.v +++ b/vlib/picoev/logging.v @@ -4,3 +4,8 @@ module picoev fn elog(msg string) { eprintln(msg) } + +@[if trace_fd ?] +fn trace_fd(msg string) { + eprintln(msg) +} diff --git a/vlib/picoev/loop_default.c.v b/vlib/picoev/loop_default.c.v index 2ba37ede8d..2f366d5586 100644 --- a/vlib/picoev/loop_default.c.v +++ b/vlib/picoev/loop_default.c.v @@ -83,18 +83,13 @@ fn (mut pv Picoev) poll_once(max_wait_in_sec int) int { if C.FD_ISSET(target.fd, &writefds) != 0 { read_events |= picoev_write } - if read_events != 0 { - $if trace_fd ? { - eprintln('do callback ${target.fd}') - } - + trace_fd('do callback ${target.fd}') // do callback! unsafe { target.cb(target.fd, read_events, &pv) } } } } } - return 0 } diff --git a/vlib/picoev/picoev.v b/vlib/picoev/picoev.v index 08c4d230bb..eb24dfc9d8 100644 --- a/vlib/picoev/picoev.v +++ b/vlib/picoev/picoev.v @@ -88,10 +88,8 @@ pub: // init fills the `file_descriptors` array pub fn (mut pv Picoev) init() { - assert max_fds > 0 - + // assert max_fds > 0 pv.num_loops = 0 - for i in 0 .. max_fds { pv.file_descriptors[i] = &Target{} } @@ -103,21 +101,17 @@ pub fn (mut pv Picoev) add(fd int, events int, timeout int, callback voidptr) in if pv == unsafe { nil } || fd < 0 || fd >= max_fds { return -1 // Invalid arguments } - mut target := pv.file_descriptors[fd] target.fd = fd target.cb = callback target.loop_id = pv.loop.id target.events = 0 - if pv.update_events(fd, events | picoev_add) != 0 { if pv.delete(fd) != 0 { elog('Error during del') } - return -1 } - pv.set_timeout(fd, timeout) return 0 } @@ -135,18 +129,12 @@ pub fn (mut pv Picoev) delete(fd int) int { if fd < 0 || fd >= max_fds { return -1 // Invalid fd } - mut target := pv.file_descriptors[fd] - - $if trace_fd ? { - eprintln('remove ${fd}') - } - + trace_fd('remove ${fd}') if pv.update_events(fd, picoev_del) != 0 { elog('Error during update_events. event: `picoev.picoev_del`') return -1 } - pv.set_timeout(fd, 0) target.loop_id = -1 target.fd = 0 @@ -156,19 +144,16 @@ pub fn (mut pv Picoev) delete(fd int) int { fn (mut pv Picoev) loop_once(max_wait_in_sec int) int { pv.loop.now = get_time() - if pv.poll_once(max_wait_in_sec) != 0 { elog('Error during poll_once') return -1 } - - if max_wait_in_sec != 0 { - pv.loop.now = get_time() // Update loop start time again if waiting occurred - } else { + if max_wait_in_sec == 0 { // If no waiting, skip timeout handling for potential performance optimization return 0 } - + // Update loop start time again if waiting occurred + pv.loop.now = get_time() pv.handle_timeout() return 0 } @@ -178,10 +163,10 @@ fn (mut pv Picoev) loop_once(max_wait_in_sec int) int { @[direct_array_access; inline] fn (mut pv Picoev) set_timeout(fd int, secs int) { assert fd < max_fds - if secs != 0 { - pv.timeouts[fd] = pv.loop.now + secs - } else { + if secs == 0 { pv.timeouts.delete(fd) + } else { + pv.timeouts[fd] = pv.loop.now + secs } } @@ -191,13 +176,11 @@ fn (mut pv Picoev) set_timeout(fd int, secs int) { @[direct_array_access; inline] fn (mut pv Picoev) handle_timeout() { mut to_remove := []int{} - for fd, timeout in pv.timeouts { if timeout <= pv.loop.now { to_remove << fd } } - for fd in to_remove { target := pv.file_descriptors[fd] assert target.loop_id == pv.loop.id @@ -210,26 +193,19 @@ fn (mut pv Picoev) handle_timeout() { fn accept_callback(listen_fd int, events int, cb_arg voidptr) { mut pv := unsafe { &Picoev(cb_arg) } accepted_fd := accept(listen_fd) - if accepted_fd == -1 { if fatal_socket_error(accepted_fd) == false { return } - elog('Error during accept') return } - if accepted_fd >= max_fds { // should never happen close_socket(accepted_fd) return } - - $if trace_fd ? { - eprintln('accept ${accepted_fd}') - } - + trace_fd('accept ${accepted_fd}') setup_sock(accepted_fd) or { elog('setup_sock failed, fd: ${accepted_fd}, listen_fd: ${listen_fd}, err: ${err.code()}') pv.error_callback(pv.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{}, @@ -256,17 +232,12 @@ fn raw_callback(fd int, events int, context voidptr) { defer { pv.idx[fd] = 0 } - if events & picoev_timeout != 0 { - $if trace_fd ? { - eprintln('timeout ${fd}') - } - + trace_fd('timeout ${fd}') if !isnil(pv.raw_callback) { pv.raw_callback(mut pv, fd, events) return } - pv.close_conn(fd) return } else if events & picoev_read != 0 { @@ -275,13 +246,11 @@ fn raw_callback(fd int, events int, context voidptr) { pv.raw_callback(mut pv, fd, events) return } - mut request_buffer := pv.buf unsafe { request_buffer += fd * pv.max_read // pointer magic } mut req := picohttpparser.Request{} - // Response init mut response_buffer := pv.out unsafe { @@ -293,7 +262,6 @@ fn raw_callback(fd int, events int, context voidptr) { buf: response_buffer date: pv.date.str } - for { // Request parsing loop r := req_read(fd, request_buffer, pv.max_read, pv.idx[fd]) // Get data from socket @@ -305,15 +273,12 @@ fn raw_callback(fd int, events int, context voidptr) { if fatal_socket_error(fd) == false { return } - elog('Error during req_read') - // fatal error pv.close_conn(fd) return } pv.idx[fd] += r - mut s := unsafe { tos(request_buffer, pv.idx[fd]) } pret := req.parse_request(s) or { // Parse error @@ -323,7 +288,6 @@ fn raw_callback(fd int, events int, context voidptr) { if pret > 0 { // Success break } - assert pret == -2 // request is incomplete, continue the loop if pv.idx[fd] == sizeof(request_buffer) { @@ -331,7 +295,6 @@ fn raw_callback(fd int, events int, context voidptr) { return } } - // Callback (should call .end() itself) pv.cb(pv.user_data, req, mut &res) } else if events & picoev_write != 0 { @@ -354,7 +317,6 @@ pub fn new(config Config) !&Picoev { elog('Error during listen: ${err}') return err } - mut pv := &Picoev{ num_loops: 1 cb: config.cb @@ -366,12 +328,10 @@ pub fn new(config Config) !&Picoev { max_read: config.max_read max_write: config.max_write } - if isnil(pv.raw_callback) { pv.buf = unsafe { malloc_noscan(max_fds * config.max_read + 1) } pv.out = unsafe { malloc_noscan(max_fds * config.max_write + 1) } } - // epoll on linux // kqueue on macos and bsd // select on windows and others @@ -382,15 +342,12 @@ pub fn new(config Config) !&Picoev { } $else { pv.loop = create_select_loop(0) or { panic(err) } } - if pv.loop == unsafe { nil } { elog('Failed to create loop') close_socket(listening_socket_fd) return unsafe { nil } } - pv.init() - pv.add(listening_socket_fd, picoev_read, 0, accept_callback) return pv } @@ -399,7 +356,6 @@ pub fn new(config Config) !&Picoev { // See also picoev.new(). pub fn (mut pv Picoev) serve() { spawn update_date_string(mut pv) - for { pv.loop_once(1) } diff --git a/vlib/picoev/socket_util.c.v b/vlib/picoev/socket_util.c.v index 988b1aa1d4..7357536f4e 100644 --- a/vlib/picoev/socket_util.c.v +++ b/vlib/picoev/socket_util.c.v @@ -30,10 +30,7 @@ fn accept(fd int) int { @[inline] fn close_socket(fd int) { - $if trace_fd ? { - eprintln('close ${fd}') - } - + trace_fd('close ${fd}') $if windows { C.closesocket(fd) } $else { @@ -44,11 +41,9 @@ fn close_socket(fd int) { @[inline] fn setup_sock(fd int) ! { flag := 1 - if C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_NODELAY, &flag, sizeof(int)) < 0 { return error('setup_sock.setup_sock failed') } - $if freebsd { if C.fcntl(fd, C.F_SETFL, C.SOCK_NONBLOCK) != 0 { return error('fcntl failed') @@ -88,11 +83,7 @@ fn fatal_socket_error(fd int) bool { return false } } - - $if trace_fd ? { - eprintln('fatal error ${fd}: ${C.errno}') - } - + trace_fd('fatal error ${fd}: ${C.errno}') return true } @@ -103,22 +94,16 @@ fn listen(config Config) !int { if fd == -1 { return error('Failed to create socket') } - - $if trace_fd ? { - eprintln('listen: ${fd}') - } - + trace_fd('listen: ${fd}') // Setting flags for socket flag := 1 flag_zero := 0 net.socket_error(C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEADDR, &flag, sizeof(int)))! - if config.family == .ip6 { // set socket to dualstack so connections to both ipv4 and ipv6 addresses // can be accepted net.socket_error(C.setsockopt(fd, C.IPPROTO_IPV6, C.IPV6_V6ONLY, &flag_zero, sizeof(int)))! } - $if linux { // epoll socket options net.socket_error(C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEPORT, &flag, sizeof(int)))! @@ -131,20 +116,18 @@ fn listen(config Config) !int { sizeof(int)))! } } - // addr settings saddr := '${config.host}:${config.port}' - addrs := net.resolve_addrs(saddr, config.family, .tcp) or { panic(err) } + addrs := net.resolve_addrs(saddr, config.family, .tcp) or { + panic('Error while resolving `${saddr}`, err: ${err}') + } addr := addrs[0] alen := addr.len() - net.socket_error_message(C.bind(fd, voidptr(&addr), alen), 'binding to ${saddr} failed')! net.socket_error_message(C.listen(fd, C.SOMAXCONN), 'listening on ${saddr} with maximum backlog pending queue of ${C.SOMAXCONN}, failed')! - setup_sock(fd) or { config.err_cb(config.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{}, err) } - return fd }