From a9ebab06da6b1612e89fcd0e9ee4d0f5dc5aa6eb Mon Sep 17 00:00:00 2001 From: kbkpbot Date: Fri, 9 Feb 2024 02:18:29 +0800 Subject: [PATCH] net: fix non-blocking read/write (#20438) --- vlib/net/common.c.v | 72 ++++++++++++-- vlib/net/net_nix.c.v | 2 + vlib/net/net_windows.c.v | 3 + vlib/net/socket.v | 17 ++++ vlib/net/tcp.c.v | 155 +++++++++++++++++++++---------- vlib/net/tcp_non_blocking_test.v | 52 +++++++++++ vlib/net/tcp_read_line.c.v | 23 ++--- vlib/net/udp.c.v | 10 +- vlib/net/unix/stream.c.v | 20 +--- 9 files changed, 260 insertions(+), 94 deletions(-) create mode 100644 vlib/net/tcp_non_blocking_test.v diff --git a/vlib/net/common.c.v b/vlib/net/common.c.v index c009354815..db64eac5c8 100644 --- a/vlib/net/common.c.v +++ b/vlib/net/common.c.v @@ -33,20 +33,76 @@ pub struct ShutdownConfig { // By default it shuts it down in both directions, both for reading // and for writing. You can change that using `net.shutdown(handle, how: .read)` // or `net.shutdown(handle, how: .write)` +// In non-blocking mode, `shutdown()` may not succeed immediately, +// so `select` is also used to make sure that the function doesn't return an incorrect result. pub fn shutdown(handle int, config ShutdownConfig) int { - $if windows { - return C.shutdown(handle, int(config.how)) + res := C.shutdown(handle, int(config.how)) + $if !net_nonblocking_sockets ? { + return res } $else { - return C.shutdown(handle, int(config.how)) + if res == 0 { + return 0 + } + ecode := error_code() + if (is_windows && ecode == int(error_ewouldblock)) || (!is_windows && res == -1 + && ecode in [int(error_einprogress), int(error_eagain), C.EINTR]) { + write_result := select_deadline(handle, .write, time.now().add(connect_timeout)) or { + false + } + err := 0 + len := sizeof(err) + xyz := C.getsockopt(handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len) + if xyz == 0 && err == 0 { + return 0 + } + if write_result { + if xyz == 0 { + return err + } + return 0 + } + } + return -ecode } } // close a socket, given its file descriptor `handle`. +// In non-blocking mode, if `close()` does not succeed immediately, +// it causes an error to be propagated to `TcpSocket.close()`, which is not intended. +// Therefore, `select` is used just like `connect()`. pub fn close(handle int) ! { - $if windows { - socket_error(C.closesocket(handle))! + res := $if windows { + C.closesocket(handle) } $else { - socket_error(C.close(handle))! + C.close(handle) + } + $if !net_nonblocking_sockets ? { + socket_error(res)! + return + } $else { + if res == 0 { + return + } + ecode := error_code() + if (is_windows && ecode == int(error_ewouldblock)) || (!is_windows && res == -1 + && ecode in [int(error_einprogress), int(error_eagain), C.EINTR]) { + write_result := select_deadline(handle, .write, time.now().add(connect_timeout))! + err := 0 + len := sizeof(err) + xyz := C.getsockopt(handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len) + if xyz == 0 && err == 0 { + return + } + if write_result { + if xyz == 0 { + wrap_error(err)! + return + } + return + } + return err_timed_out + } + wrap_error(ecode)! } } @@ -95,8 +151,8 @@ fn select_deadline(handle int, test Select, deadline time.Time) !bool { for infinite || time.now() <= deadline { timeout := if infinite { net.infinite_timeout } else { deadline - time.now() } ready := @select(handle, test, timeout) or { - if err.code() == 4 { - // Spurious wakeup from signal, keep waiting + if err.code() == C.EINTR { + // errno is 4, Spurious wakeup from signal, keep waiting continue } diff --git a/vlib/net/net_nix.c.v b/vlib/net/net_nix.c.v index fc224675a0..9992aa2bb4 100644 --- a/vlib/net/net_nix.c.v +++ b/vlib/net/net_nix.c.v @@ -21,8 +21,10 @@ fn init() { } pub const msg_nosignal = 0x4000 +pub const msg_dontwait = C.MSG_DONTWAIT pub const error_ewouldblock = C.EWOULDBLOCK pub const error_einprogress = C.EINPROGRESS +pub const error_eagain = C.EAGAIN fn C.unlink(&char) int diff --git a/vlib/net/net_windows.c.v b/vlib/net/net_windows.c.v index 9d53db6587..df118e2f90 100644 --- a/vlib/net/net_windows.c.v +++ b/vlib/net/net_windows.c.v @@ -10,8 +10,11 @@ const is_windows = true // Constants that windows needs pub const fionbio = C.FIONBIO pub const msg_nosignal = 0 +pub const msg_dontwait = 0 + pub const error_ewouldblock = WsaError.wsaewouldblock pub const error_einprogress = WsaError.wsaeinprogress +pub const error_eagain = WsaError.wsaewouldblock // on windows, is also wsaewouldblock const wsa_v22 = 0x202 diff --git a/vlib/net/socket.v b/vlib/net/socket.v index f2c72fa0db..cce30356e1 100644 --- a/vlib/net/socket.v +++ b/vlib/net/socket.v @@ -9,3 +9,20 @@ pub: pub fn (s &Socket) address() !Addr { return addr_from_socket_handle(s.handle) } + +// set_blocking will change the state of the socket to either blocking, +// when state is true, or non blocking (false). +pub fn set_blocking(handle int, state bool) ! { + $if windows { + t := if state { u32(0) } else { u32(1) } + socket_error(C.ioctlsocket(handle, fionbio, &t))! + } $else { + mut flags := C.fcntl(handle, C.F_GETFL, 0) + if state { + flags &= ~C.O_NONBLOCK + } else { + flags |= C.O_NONBLOCK + } + socket_error(C.fcntl(handle, C.F_SETFL, flags))! + } +} diff --git a/vlib/net/tcp.c.v b/vlib/net/tcp.c.v index 8943fc8a75..ba11990274 100644 --- a/vlib/net/tcp.c.v +++ b/vlib/net/tcp.c.v @@ -16,7 +16,7 @@ mut: read_deadline time.Time read_timeout time.Duration write_timeout time.Duration - is_blocking bool + is_blocking bool = true } pub fn dial_tcp(oaddress string) !&TcpConn { @@ -49,11 +49,16 @@ pub fn dial_tcp(oaddress string) !&TcpConn { continue } - return &TcpConn{ + mut conn := &TcpConn{ sock: s read_timeout: net.tcp_default_read_timeout write_timeout: net.tcp_default_write_timeout } + // The blocking / non-blocking mode is determined before the connection is established. + $if net_nonblocking_sockets ? { + conn.is_blocking = false + } + return conn } // Once we've failed now try and explain why we failed to connect @@ -92,11 +97,16 @@ pub fn dial_tcp_with_bind(saddr string, laddr string) !&TcpConn { continue } - return &TcpConn{ + mut conn := &TcpConn{ sock: s read_timeout: net.tcp_default_read_timeout write_timeout: net.tcp_default_write_timeout } + // The blocking / non-blocking mode is determined before the connection is established. + $if net_nonblocking_sockets ? { + conn.is_blocking = false + } + return conn } // failed return error('dial_tcp_with_bind failed for address ${saddr}') @@ -110,10 +120,25 @@ pub fn (mut c TcpConn) close() ! { } pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int { + mut should_ewouldblock := false mut res := $if is_coroutine ? { - wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))! + C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout) } $else { - wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! + // The new socket returned by accept() behaves differently in blocking mode and needs special treatment. + mut has_data := true + if c.is_blocking { + if ok := @select(c.sock.handle, .read, 1) { + has_data = ok + } else { + false + } + } + if has_data { + C.recv(c.sock.handle, voidptr(buf_ptr), len, msg_dontwait) + } else { + should_ewouldblock = true + -1 + } } $if trace_tcp ? { eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') @@ -126,13 +151,13 @@ pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int { } return res } - code := error_code() - if code == int(error_ewouldblock) { + code := if should_ewouldblock { int(error_ewouldblock) } else { error_code() } + if code in [int(error_ewouldblock), int(error_eagain), C.EINTR] { c.wait_for_read()! res = $if is_coroutine ? { - wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))! + C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout) } $else { - wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! + C.recv(c.sock.handle, voidptr(buf_ptr), len, msg_dontwait) } $if trace_tcp ? { eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') @@ -193,7 +218,7 @@ pub fn (mut c TcpConn) write_ptr(b &u8, len int) !int { } if sent < 0 { code := error_code() - if code == int(error_ewouldblock) { + if code in [int(error_ewouldblock), int(error_eagain), C.EINTR] { c.wait_for_write()! continue } else { @@ -291,6 +316,7 @@ pub mut: mut: accept_timeout time.Duration accept_deadline time.Time + is_blocking bool = true } @[params] @@ -316,11 +342,48 @@ pub fn listen_tcp(family AddrFamily, saddr string, options ListenOptions) !&TcpL // cast to the correct type alen := addr.len() socket_error_message(C.bind(s.handle, voidptr(&addr), alen), 'binding to ${saddr} failed')! - socket_error_message(C.listen(s.handle, options.backlog), 'listening on ${saddr} with maximum backlog pending queue of ${options.backlog}, failed')! - return &TcpListener{ - sock: s - accept_deadline: no_deadline - accept_timeout: infinite_timeout + mut res := C.listen(s.handle, options.backlog) + if res == 0 { + mut listener := &TcpListener{ + sock: s + accept_deadline: no_deadline + accept_timeout: infinite_timeout + } + // The blocking / non-blocking mode is determined before the connection is established. + $if net_nonblocking_sockets ? { + listener.is_blocking = false + } + return listener + } + + $if !net_nonblocking_sockets ? { + socket_error_message(res, 'listening on ${saddr} with maximum backlog pending queue of ${options.backlog}, failed')! + return &TcpListener(unsafe { nil }) // for compiler passed + } $else { + // non-blocking sockets may also not succeed immediately when they listen() and need to check the status and take action accordingly. + for { + code := error_code() + if code in [int(error_einprogress), int(error_ewouldblock), int(error_eagain), C.EINTR] { + @select(s.handle, .read, net.connect_timeout)! + res = C.listen(s.handle, options.backlog) + if res == 0 { + break + } + } else { + socket_error_message(res, 'listening on ${saddr} with maximum backlog pending queue of ${options.backlog}, failed')! + break // for compiler passed + } + } + mut listener := &TcpListener{ + sock: s + accept_deadline: no_deadline + accept_timeout: infinite_timeout + } + // The blocking / non-blocking mode is determined before the connection is established. + $if net_nonblocking_sockets ? { + listener.is_blocking = false + } + return listener } } @@ -350,27 +413,39 @@ pub fn (mut l TcpListener) accept_only() !&TcpConn { eprintln(' TcpListener.accept | l.sock.handle: ${l.sock.handle:6}') } + // The blocking mode `accept()` does not support a timeout option, so `select` is used instead. + $if !is_coroutine ? { + if l.is_blocking { + l.wait_for_accept()! + } + } + mut new_handle := $if is_coroutine ? { C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout) } $else { C.accept(l.sock.handle, 0, 0) } + + if !l.is_blocking && new_handle <= 0 { + code := error_code() + if code in [int(error_einprogress), int(error_ewouldblock), int(error_eagain), C.EINTR] { + l.wait_for_accept()! + new_handle = $if is_coroutine ? { + C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout) + } $else { + C.accept(l.sock.handle, 0, 0) + } + } + } if new_handle <= 0 { - l.wait_for_accept()! - new_handle = $if is_coroutine ? { - C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout) - } $else { - C.accept(l.sock.handle, 0, 0) - } - if new_handle == -1 || new_handle == 0 { - return error('accept failed') - } + return error('accept failed') } return &TcpConn{ handle: new_handle read_timeout: net.tcp_default_read_timeout write_timeout: net.tcp_default_write_timeout + is_blocking: l.is_blocking } } @@ -426,15 +501,13 @@ fn new_tcp_socket(family AddrFamily) !TcpSocket { // we shouldn't be using ioctlsocket in the 21st century // use the non-blocking socket option instead please :) + // Some options need to be set before the connection is established, otherwise they will not work. s.set_default_options()! - $if !net_blocking_sockets ? { - $if windows { - t := u32(1) // true - socket_error(C.ioctlsocket(handle, fionbio, &t))! - } $else { - socket_error(C.fcntl(handle, C.F_SETFL, C.fcntl(handle, C.F_GETFL) | C.O_NONBLOCK))! - } + // Set the desired "blocking/non-blocking" mode before the connection is established, + // and do not change it once the connection is successful. + $if net_nonblocking_sockets ? { + set_blocking(handle, false)! } return s } @@ -452,14 +525,6 @@ fn tcp_socket_from_handle(sockfd int) !TcpSocket { } s.set_default_options()! - $if !net_blocking_sockets ? { - $if windows { - t := u32(1) // true - socket_error(C.ioctlsocket(sockfd, fionbio, &t))! - } $else { - socket_error(C.fcntl(sockfd, C.F_SETFL, C.fcntl(sockfd, C.F_GETFL) | C.O_NONBLOCK))! - } - } return s } @@ -534,14 +599,10 @@ fn (mut s TcpSocket) close() ! { return close(s.handle) } -fn (mut s TcpSocket) @select(test Select, timeout time.Duration) !bool { - return @select(s.handle, test, timeout) -} - const connect_timeout = 5 * time.second fn (mut s TcpSocket) connect(a Addr) ! { - $if !net_blocking_sockets ? { + $if net_nonblocking_sockets ? { res := $if is_coroutine ? { C.photon_connect(s.handle, voidptr(&a), a.len(), net.tcp_default_read_timeout) } $else { @@ -553,8 +614,8 @@ fn (mut s TcpSocket) connect(a Addr) ! { ecode := error_code() // On nix non-blocking sockets we expect einprogress // On windows we expect res == -1 && error_code() == ewouldblock - if (is_windows && ecode == int(error_ewouldblock)) - || (!is_windows && res == -1 && ecode == int(error_einprogress)) { + if (is_windows && ecode == int(error_ewouldblock)) || (!is_windows && res == -1 + && ecode in [int(error_einprogress), int(error_eagain), C.EINTR]) { // The socket is nonblocking and the connection cannot be completed // immediately. (UNIX domain sockets failed with EAGAIN instead.) // It is possible to select(2) or poll(2) for completion by selecting @@ -563,7 +624,7 @@ fn (mut s TcpSocket) connect(a Addr) ! { // determine whether connect() completed successfully (SO_ERROR is zero) or // unsuccessfully (SO_ERROR is one of the usual error codes listed here, // ex‐ plaining the reason for the failure). - write_result := s.@select(.write, net.connect_timeout)! + write_result := @select(s.handle, .write, net.connect_timeout)! err := 0 len := sizeof(err) xyz := C.getsockopt(s.handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len) diff --git a/vlib/net/tcp_non_blocking_test.v b/vlib/net/tcp_non_blocking_test.v new file mode 100644 index 0000000000..64f619c100 --- /dev/null +++ b/vlib/net/tcp_non_blocking_test.v @@ -0,0 +1,52 @@ +import net +import time + +fn server_thread(c_chan chan int) { + errors_no_data := [net.err_timed_out.code(), int(net.error_ewouldblock), int(net.error_eagain), + C.EINTR] + mut buf := []u8{len: 128} + mut times := 0 + mut read_len := 0 + mut listener := net.listen_tcp(.ip, ':22444') or { panic(err) } + c_chan <- 1 + mut server := listener.accept() or { panic(err) } + server.set_read_timeout(2 * time.second) + server.set_blocking(false) or { panic(err) } + read_len = server.read(mut buf) or { // nothing can be read yet + assert err.code() in errors_no_data + -1 + } + assert read_len == -1 // ensure there is an error with no data + read_len = server.read(mut buf) or { // nothing can be read yet + assert err.code() in errors_no_data + -2 + } + assert read_len == -2 // ensure there is an error with no data + c_chan <- 2 + for times < 10 { + times++ + time.sleep(1 * time.millisecond) + read_len = server.read(mut buf) or { + if err.code() in errors_no_data { + continue + } else { + panic(err) + } + } + if read_len > 0 { + break + } + } + assert unsafe { tos_clone(&buf[0]) == 'hello' } +} + +fn test_non_blocking_read() { + mut c_chan := chan int{cap: 1} + server := spawn server_thread(c_chan) + _ := <-c_chan // 1 + mut conn := net.dial_tcp('127.0.0.1:22444') or { panic(err) } + conn.set_blocking(false) or { panic(err) } + _ := <-c_chan // 2 + conn.write('hello'.bytes()) or { panic(err) } + server.wait() +} diff --git a/vlib/net/tcp_read_line.c.v b/vlib/net/tcp_read_line.c.v index 9e6c9c7989..cc04f0e000 100644 --- a/vlib/net/tcp_read_line.c.v +++ b/vlib/net/tcp_read_line.c.v @@ -18,25 +18,16 @@ pub fn (mut con TcpConn) get_blocking() bool { // set_blocking will change the state of the connection to either blocking, // when state is true, or non blocking (false). -// The default for `net` tcp connections is the non blocking mode. +// The default for `net` tcp connections is the blocking mode. // Calling .read_line will set the connection to blocking mode. +// In general, changing the blocking mode after a successful connection may cause unexpected surprises, +// so this function is not recommended to be called anywhere but for this file. pub fn (mut con TcpConn) set_blocking(state bool) ! { - con.is_blocking = state - $if windows { - mut t := u32(0) - if !con.is_blocking { - t = 1 - } - socket_error(C.ioctlsocket(con.sock.handle, fionbio, &t))! - } $else { - mut flags := C.fcntl(con.sock.handle, C.F_GETFL, 0) - if state { - flags &= ~C.O_NONBLOCK - } else { - flags |= C.O_NONBLOCK - } - socket_error(C.fcntl(con.sock.handle, C.F_SETFL, flags))! + if con.is_blocking == state { + return } + con.is_blocking = state + set_blocking(con.sock.handle, state)! } // read_line is a *simple*, *non customizable*, blocking line reader. diff --git a/vlib/net/udp.c.v b/vlib/net/udp.c.v index 932efb5184..68e002f721 100644 --- a/vlib/net/udp.c.v +++ b/vlib/net/udp.c.v @@ -211,14 +211,8 @@ fn new_udp_socket(local_addr Addr) !&UdpSocket { s.set_dualstack(true)! } - $if !net_blocking_sockets ? { - // NOTE: refer to comments in tcp.v - $if windows { - t := u32(1) // true - socket_error(C.ioctlsocket(sockfd, fionbio, &t))! - } $else { - socket_error(C.fcntl(sockfd, C.F_SETFD, C.O_NONBLOCK))! - } + $if net_nonblocking_sockets ? { + set_blocking(sockfd, false)! } // cast to the correct type diff --git a/vlib/net/unix/stream.c.v b/vlib/net/unix/stream.c.v index 247936c9cd..0e498216ce 100644 --- a/vlib/net/unix/stream.c.v +++ b/vlib/net/unix/stream.c.v @@ -374,13 +374,8 @@ fn new_stream_socket(socket_path string) !StreamSocket { eprintln(' new_unix_socket | s.handle: ${s.handle:6}') } - $if !net_blocking_sockets ? { - $if windows { - t := u32(1) // true - net.socket_error(C.ioctlsocket(handle, net.fionbio, &t))! - } $else { - net.socket_error(C.fcntl(handle, C.F_SETFL, C.fcntl(handle, C.F_GETFL) | C.O_NONBLOCK))! - } + $if net_nonblocking_sockets ? { + net.set_blocking(handle, false)! } return s } @@ -430,7 +425,7 @@ fn (mut s StreamSocket) connect(socket_path string) ! { alen := addr.len() eprintln(addr) - $if !net_blocking_sockets ? { + $if net_nonblocking_sockets ? { res := $if is_coroutine ? { C.photon_connect(s.handle, voidptr(&addr), alen, unix.unix_default_read_timeout) } $else { @@ -487,13 +482,8 @@ pub fn stream_socket_from_handle(sockfd int) !&StreamSocket { eprintln(' stream_socket_from_handle | s.handle: ${s.handle:6}') } - $if !net_blocking_sockets ? { - $if windows { - t := u32(1) // true - net.socket_error(C.ioctlsocket(sockfd, net.fionbio, &t))! - } $else { - net.socket_error(C.fcntl(sockfd, C.F_SETFL, C.fcntl(sockfd, C.F_GETFL) | C.O_NONBLOCK))! - } + $if net_nonblocking_sockets ? { + net.set_blocking(sockfd, false)! } return s }