net: fix non-blocking read/write (#20438)

This commit is contained in:
kbkpbot 2024-02-09 02:18:29 +08:00 committed by GitHub
parent 410bd9db71
commit a9ebab06da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 260 additions and 94 deletions

View File

@ -33,20 +33,76 @@ pub struct ShutdownConfig {
// By default it shuts it down in both directions, both for reading // By default it shuts it down in both directions, both for reading
// and for writing. You can change that using `net.shutdown(handle, how: .read)` // and for writing. You can change that using `net.shutdown(handle, how: .read)`
// or `net.shutdown(handle, how: .write)` // or `net.shutdown(handle, how: .write)`
// In non-blocking mode, `shutdown()` may not succeed immediately,
// so `select` is also used to make sure that the function doesn't return an incorrect result.
pub fn shutdown(handle int, config ShutdownConfig) int { pub fn shutdown(handle int, config ShutdownConfig) int {
$if windows { res := C.shutdown(handle, int(config.how))
return C.shutdown(handle, int(config.how)) $if !net_nonblocking_sockets ? {
return res
} $else { } $else {
return C.shutdown(handle, int(config.how)) if res == 0 {
return 0
}
ecode := error_code()
if (is_windows && ecode == int(error_ewouldblock)) || (!is_windows && res == -1
&& ecode in [int(error_einprogress), int(error_eagain), C.EINTR]) {
write_result := select_deadline(handle, .write, time.now().add(connect_timeout)) or {
false
}
err := 0
len := sizeof(err)
xyz := C.getsockopt(handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len)
if xyz == 0 && err == 0 {
return 0
}
if write_result {
if xyz == 0 {
return err
}
return 0
}
}
return -ecode
} }
} }
// close a socket, given its file descriptor `handle`. // close a socket, given its file descriptor `handle`.
// In non-blocking mode, if `close()` does not succeed immediately,
// it causes an error to be propagated to `TcpSocket.close()`, which is not intended.
// Therefore, `select` is used just like `connect()`.
pub fn close(handle int) ! { pub fn close(handle int) ! {
$if windows { res := $if windows {
socket_error(C.closesocket(handle))! C.closesocket(handle)
} $else { } $else {
socket_error(C.close(handle))! C.close(handle)
}
$if !net_nonblocking_sockets ? {
socket_error(res)!
return
} $else {
if res == 0 {
return
}
ecode := error_code()
if (is_windows && ecode == int(error_ewouldblock)) || (!is_windows && res == -1
&& ecode in [int(error_einprogress), int(error_eagain), C.EINTR]) {
write_result := select_deadline(handle, .write, time.now().add(connect_timeout))!
err := 0
len := sizeof(err)
xyz := C.getsockopt(handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len)
if xyz == 0 && err == 0 {
return
}
if write_result {
if xyz == 0 {
wrap_error(err)!
return
}
return
}
return err_timed_out
}
wrap_error(ecode)!
} }
} }
@ -95,8 +151,8 @@ fn select_deadline(handle int, test Select, deadline time.Time) !bool {
for infinite || time.now() <= deadline { for infinite || time.now() <= deadline {
timeout := if infinite { net.infinite_timeout } else { deadline - time.now() } timeout := if infinite { net.infinite_timeout } else { deadline - time.now() }
ready := @select(handle, test, timeout) or { ready := @select(handle, test, timeout) or {
if err.code() == 4 { if err.code() == C.EINTR {
// Spurious wakeup from signal, keep waiting // errno is 4, Spurious wakeup from signal, keep waiting
continue continue
} }

View File

@ -21,8 +21,10 @@ fn init() {
} }
pub const msg_nosignal = 0x4000 pub const msg_nosignal = 0x4000
pub const msg_dontwait = C.MSG_DONTWAIT
pub const error_ewouldblock = C.EWOULDBLOCK pub const error_ewouldblock = C.EWOULDBLOCK
pub const error_einprogress = C.EINPROGRESS pub const error_einprogress = C.EINPROGRESS
pub const error_eagain = C.EAGAIN
fn C.unlink(&char) int fn C.unlink(&char) int

View File

@ -10,8 +10,11 @@ const is_windows = true
// Constants that windows needs // Constants that windows needs
pub const fionbio = C.FIONBIO pub const fionbio = C.FIONBIO
pub const msg_nosignal = 0 pub const msg_nosignal = 0
pub const msg_dontwait = 0
pub const error_ewouldblock = WsaError.wsaewouldblock pub const error_ewouldblock = WsaError.wsaewouldblock
pub const error_einprogress = WsaError.wsaeinprogress pub const error_einprogress = WsaError.wsaeinprogress
pub const error_eagain = WsaError.wsaewouldblock // on windows, is also wsaewouldblock
const wsa_v22 = 0x202 const wsa_v22 = 0x202

View File

@ -9,3 +9,20 @@ pub:
pub fn (s &Socket) address() !Addr { pub fn (s &Socket) address() !Addr {
return addr_from_socket_handle(s.handle) return addr_from_socket_handle(s.handle)
} }
// set_blocking will change the state of the socket to either blocking,
// when state is true, or non blocking (false).
pub fn set_blocking(handle int, state bool) ! {
$if windows {
t := if state { u32(0) } else { u32(1) }
socket_error(C.ioctlsocket(handle, fionbio, &t))!
} $else {
mut flags := C.fcntl(handle, C.F_GETFL, 0)
if state {
flags &= ~C.O_NONBLOCK
} else {
flags |= C.O_NONBLOCK
}
socket_error(C.fcntl(handle, C.F_SETFL, flags))!
}
}

View File

@ -16,7 +16,7 @@ mut:
read_deadline time.Time read_deadline time.Time
read_timeout time.Duration read_timeout time.Duration
write_timeout time.Duration write_timeout time.Duration
is_blocking bool is_blocking bool = true
} }
pub fn dial_tcp(oaddress string) !&TcpConn { pub fn dial_tcp(oaddress string) !&TcpConn {
@ -49,11 +49,16 @@ pub fn dial_tcp(oaddress string) !&TcpConn {
continue continue
} }
return &TcpConn{ mut conn := &TcpConn{
sock: s sock: s
read_timeout: net.tcp_default_read_timeout read_timeout: net.tcp_default_read_timeout
write_timeout: net.tcp_default_write_timeout write_timeout: net.tcp_default_write_timeout
} }
// The blocking / non-blocking mode is determined before the connection is established.
$if net_nonblocking_sockets ? {
conn.is_blocking = false
}
return conn
} }
// Once we've failed now try and explain why we failed to connect // Once we've failed now try and explain why we failed to connect
@ -92,11 +97,16 @@ pub fn dial_tcp_with_bind(saddr string, laddr string) !&TcpConn {
continue continue
} }
return &TcpConn{ mut conn := &TcpConn{
sock: s sock: s
read_timeout: net.tcp_default_read_timeout read_timeout: net.tcp_default_read_timeout
write_timeout: net.tcp_default_write_timeout write_timeout: net.tcp_default_write_timeout
} }
// The blocking / non-blocking mode is determined before the connection is established.
$if net_nonblocking_sockets ? {
conn.is_blocking = false
}
return conn
} }
// failed // failed
return error('dial_tcp_with_bind failed for address ${saddr}') return error('dial_tcp_with_bind failed for address ${saddr}')
@ -110,10 +120,25 @@ pub fn (mut c TcpConn) close() ! {
} }
pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int { pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int {
mut should_ewouldblock := false
mut res := $if is_coroutine ? { mut res := $if is_coroutine ? {
wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))! C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout)
} $else { } $else {
wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! // The new socket returned by accept() behaves differently in blocking mode and needs special treatment.
mut has_data := true
if c.is_blocking {
if ok := @select(c.sock.handle, .read, 1) {
has_data = ok
} else {
false
}
}
if has_data {
C.recv(c.sock.handle, voidptr(buf_ptr), len, msg_dontwait)
} else {
should_ewouldblock = true
-1
}
} }
$if trace_tcp ? { $if trace_tcp ? {
eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
@ -126,13 +151,13 @@ pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int {
} }
return res return res
} }
code := error_code() code := if should_ewouldblock { int(error_ewouldblock) } else { error_code() }
if code == int(error_ewouldblock) { if code in [int(error_ewouldblock), int(error_eagain), C.EINTR] {
c.wait_for_read()! c.wait_for_read()!
res = $if is_coroutine ? { res = $if is_coroutine ? {
wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))! C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout)
} $else { } $else {
wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! C.recv(c.sock.handle, voidptr(buf_ptr), len, msg_dontwait)
} }
$if trace_tcp ? { $if trace_tcp ? {
eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
@ -193,7 +218,7 @@ pub fn (mut c TcpConn) write_ptr(b &u8, len int) !int {
} }
if sent < 0 { if sent < 0 {
code := error_code() code := error_code()
if code == int(error_ewouldblock) { if code in [int(error_ewouldblock), int(error_eagain), C.EINTR] {
c.wait_for_write()! c.wait_for_write()!
continue continue
} else { } else {
@ -291,6 +316,7 @@ pub mut:
mut: mut:
accept_timeout time.Duration accept_timeout time.Duration
accept_deadline time.Time accept_deadline time.Time
is_blocking bool = true
} }
@[params] @[params]
@ -316,11 +342,48 @@ pub fn listen_tcp(family AddrFamily, saddr string, options ListenOptions) !&TcpL
// cast to the correct type // cast to the correct type
alen := addr.len() alen := addr.len()
socket_error_message(C.bind(s.handle, voidptr(&addr), alen), 'binding to ${saddr} failed')! socket_error_message(C.bind(s.handle, voidptr(&addr), alen), 'binding to ${saddr} failed')!
socket_error_message(C.listen(s.handle, options.backlog), 'listening on ${saddr} with maximum backlog pending queue of ${options.backlog}, failed')! mut res := C.listen(s.handle, options.backlog)
return &TcpListener{ if res == 0 {
sock: s mut listener := &TcpListener{
accept_deadline: no_deadline sock: s
accept_timeout: infinite_timeout accept_deadline: no_deadline
accept_timeout: infinite_timeout
}
// The blocking / non-blocking mode is determined before the connection is established.
$if net_nonblocking_sockets ? {
listener.is_blocking = false
}
return listener
}
$if !net_nonblocking_sockets ? {
socket_error_message(res, 'listening on ${saddr} with maximum backlog pending queue of ${options.backlog}, failed')!
return &TcpListener(unsafe { nil }) // for compiler passed
} $else {
// non-blocking sockets may also not succeed immediately when they listen() and need to check the status and take action accordingly.
for {
code := error_code()
if code in [int(error_einprogress), int(error_ewouldblock), int(error_eagain), C.EINTR] {
@select(s.handle, .read, net.connect_timeout)!
res = C.listen(s.handle, options.backlog)
if res == 0 {
break
}
} else {
socket_error_message(res, 'listening on ${saddr} with maximum backlog pending queue of ${options.backlog}, failed')!
break // for compiler passed
}
}
mut listener := &TcpListener{
sock: s
accept_deadline: no_deadline
accept_timeout: infinite_timeout
}
// The blocking / non-blocking mode is determined before the connection is established.
$if net_nonblocking_sockets ? {
listener.is_blocking = false
}
return listener
} }
} }
@ -350,27 +413,39 @@ pub fn (mut l TcpListener) accept_only() !&TcpConn {
eprintln(' TcpListener.accept | l.sock.handle: ${l.sock.handle:6}') eprintln(' TcpListener.accept | l.sock.handle: ${l.sock.handle:6}')
} }
// The blocking mode `accept()` does not support a timeout option, so `select` is used instead.
$if !is_coroutine ? {
if l.is_blocking {
l.wait_for_accept()!
}
}
mut new_handle := $if is_coroutine ? { mut new_handle := $if is_coroutine ? {
C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout) C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout)
} $else { } $else {
C.accept(l.sock.handle, 0, 0) C.accept(l.sock.handle, 0, 0)
} }
if !l.is_blocking && new_handle <= 0 {
code := error_code()
if code in [int(error_einprogress), int(error_ewouldblock), int(error_eagain), C.EINTR] {
l.wait_for_accept()!
new_handle = $if is_coroutine ? {
C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout)
} $else {
C.accept(l.sock.handle, 0, 0)
}
}
}
if new_handle <= 0 { if new_handle <= 0 {
l.wait_for_accept()! return error('accept failed')
new_handle = $if is_coroutine ? {
C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout)
} $else {
C.accept(l.sock.handle, 0, 0)
}
if new_handle == -1 || new_handle == 0 {
return error('accept failed')
}
} }
return &TcpConn{ return &TcpConn{
handle: new_handle handle: new_handle
read_timeout: net.tcp_default_read_timeout read_timeout: net.tcp_default_read_timeout
write_timeout: net.tcp_default_write_timeout write_timeout: net.tcp_default_write_timeout
is_blocking: l.is_blocking
} }
} }
@ -426,15 +501,13 @@ fn new_tcp_socket(family AddrFamily) !TcpSocket {
// we shouldn't be using ioctlsocket in the 21st century // we shouldn't be using ioctlsocket in the 21st century
// use the non-blocking socket option instead please :) // use the non-blocking socket option instead please :)
// Some options need to be set before the connection is established, otherwise they will not work.
s.set_default_options()! s.set_default_options()!
$if !net_blocking_sockets ? { // Set the desired "blocking/non-blocking" mode before the connection is established,
$if windows { // and do not change it once the connection is successful.
t := u32(1) // true $if net_nonblocking_sockets ? {
socket_error(C.ioctlsocket(handle, fionbio, &t))! set_blocking(handle, false)!
} $else {
socket_error(C.fcntl(handle, C.F_SETFL, C.fcntl(handle, C.F_GETFL) | C.O_NONBLOCK))!
}
} }
return s return s
} }
@ -452,14 +525,6 @@ fn tcp_socket_from_handle(sockfd int) !TcpSocket {
} }
s.set_default_options()! s.set_default_options()!
$if !net_blocking_sockets ? {
$if windows {
t := u32(1) // true
socket_error(C.ioctlsocket(sockfd, fionbio, &t))!
} $else {
socket_error(C.fcntl(sockfd, C.F_SETFL, C.fcntl(sockfd, C.F_GETFL) | C.O_NONBLOCK))!
}
}
return s return s
} }
@ -534,14 +599,10 @@ fn (mut s TcpSocket) close() ! {
return close(s.handle) return close(s.handle)
} }
fn (mut s TcpSocket) @select(test Select, timeout time.Duration) !bool {
return @select(s.handle, test, timeout)
}
const connect_timeout = 5 * time.second const connect_timeout = 5 * time.second
fn (mut s TcpSocket) connect(a Addr) ! { fn (mut s TcpSocket) connect(a Addr) ! {
$if !net_blocking_sockets ? { $if net_nonblocking_sockets ? {
res := $if is_coroutine ? { res := $if is_coroutine ? {
C.photon_connect(s.handle, voidptr(&a), a.len(), net.tcp_default_read_timeout) C.photon_connect(s.handle, voidptr(&a), a.len(), net.tcp_default_read_timeout)
} $else { } $else {
@ -553,8 +614,8 @@ fn (mut s TcpSocket) connect(a Addr) ! {
ecode := error_code() ecode := error_code()
// On nix non-blocking sockets we expect einprogress // On nix non-blocking sockets we expect einprogress
// On windows we expect res == -1 && error_code() == ewouldblock // On windows we expect res == -1 && error_code() == ewouldblock
if (is_windows && ecode == int(error_ewouldblock)) if (is_windows && ecode == int(error_ewouldblock)) || (!is_windows && res == -1
|| (!is_windows && res == -1 && ecode == int(error_einprogress)) { && ecode in [int(error_einprogress), int(error_eagain), C.EINTR]) {
// The socket is nonblocking and the connection cannot be completed // The socket is nonblocking and the connection cannot be completed
// immediately. (UNIX domain sockets failed with EAGAIN instead.) // immediately. (UNIX domain sockets failed with EAGAIN instead.)
// It is possible to select(2) or poll(2) for completion by selecting // It is possible to select(2) or poll(2) for completion by selecting
@ -563,7 +624,7 @@ fn (mut s TcpSocket) connect(a Addr) ! {
// determine whether connect() completed successfully (SO_ERROR is zero) or // determine whether connect() completed successfully (SO_ERROR is zero) or
// unsuccessfully (SO_ERROR is one of the usual error codes listed here, // unsuccessfully (SO_ERROR is one of the usual error codes listed here,
// ex plaining the reason for the failure). // ex plaining the reason for the failure).
write_result := s.@select(.write, net.connect_timeout)! write_result := @select(s.handle, .write, net.connect_timeout)!
err := 0 err := 0
len := sizeof(err) len := sizeof(err)
xyz := C.getsockopt(s.handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len) xyz := C.getsockopt(s.handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len)

View File

@ -0,0 +1,52 @@
import net
import time
fn server_thread(c_chan chan int) {
errors_no_data := [net.err_timed_out.code(), int(net.error_ewouldblock), int(net.error_eagain),
C.EINTR]
mut buf := []u8{len: 128}
mut times := 0
mut read_len := 0
mut listener := net.listen_tcp(.ip, ':22444') or { panic(err) }
c_chan <- 1
mut server := listener.accept() or { panic(err) }
server.set_read_timeout(2 * time.second)
server.set_blocking(false) or { panic(err) }
read_len = server.read(mut buf) or { // nothing can be read yet
assert err.code() in errors_no_data
-1
}
assert read_len == -1 // ensure there is an error with no data
read_len = server.read(mut buf) or { // nothing can be read yet
assert err.code() in errors_no_data
-2
}
assert read_len == -2 // ensure there is an error with no data
c_chan <- 2
for times < 10 {
times++
time.sleep(1 * time.millisecond)
read_len = server.read(mut buf) or {
if err.code() in errors_no_data {
continue
} else {
panic(err)
}
}
if read_len > 0 {
break
}
}
assert unsafe { tos_clone(&buf[0]) == 'hello' }
}
fn test_non_blocking_read() {
mut c_chan := chan int{cap: 1}
server := spawn server_thread(c_chan)
_ := <-c_chan // 1
mut conn := net.dial_tcp('127.0.0.1:22444') or { panic(err) }
conn.set_blocking(false) or { panic(err) }
_ := <-c_chan // 2
conn.write('hello'.bytes()) or { panic(err) }
server.wait()
}

View File

@ -18,25 +18,16 @@ pub fn (mut con TcpConn) get_blocking() bool {
// set_blocking will change the state of the connection to either blocking, // set_blocking will change the state of the connection to either blocking,
// when state is true, or non blocking (false). // when state is true, or non blocking (false).
// The default for `net` tcp connections is the non blocking mode. // The default for `net` tcp connections is the blocking mode.
// Calling .read_line will set the connection to blocking mode. // Calling .read_line will set the connection to blocking mode.
// In general, changing the blocking mode after a successful connection may cause unexpected surprises,
// so this function is not recommended to be called anywhere but for this file.
pub fn (mut con TcpConn) set_blocking(state bool) ! { pub fn (mut con TcpConn) set_blocking(state bool) ! {
con.is_blocking = state if con.is_blocking == state {
$if windows { return
mut t := u32(0)
if !con.is_blocking {
t = 1
}
socket_error(C.ioctlsocket(con.sock.handle, fionbio, &t))!
} $else {
mut flags := C.fcntl(con.sock.handle, C.F_GETFL, 0)
if state {
flags &= ~C.O_NONBLOCK
} else {
flags |= C.O_NONBLOCK
}
socket_error(C.fcntl(con.sock.handle, C.F_SETFL, flags))!
} }
con.is_blocking = state
set_blocking(con.sock.handle, state)!
} }
// read_line is a *simple*, *non customizable*, blocking line reader. // read_line is a *simple*, *non customizable*, blocking line reader.

View File

@ -211,14 +211,8 @@ fn new_udp_socket(local_addr Addr) !&UdpSocket {
s.set_dualstack(true)! s.set_dualstack(true)!
} }
$if !net_blocking_sockets ? { $if net_nonblocking_sockets ? {
// NOTE: refer to comments in tcp.v set_blocking(sockfd, false)!
$if windows {
t := u32(1) // true
socket_error(C.ioctlsocket(sockfd, fionbio, &t))!
} $else {
socket_error(C.fcntl(sockfd, C.F_SETFD, C.O_NONBLOCK))!
}
} }
// cast to the correct type // cast to the correct type

View File

@ -374,13 +374,8 @@ fn new_stream_socket(socket_path string) !StreamSocket {
eprintln(' new_unix_socket | s.handle: ${s.handle:6}') eprintln(' new_unix_socket | s.handle: ${s.handle:6}')
} }
$if !net_blocking_sockets ? { $if net_nonblocking_sockets ? {
$if windows { net.set_blocking(handle, false)!
t := u32(1) // true
net.socket_error(C.ioctlsocket(handle, net.fionbio, &t))!
} $else {
net.socket_error(C.fcntl(handle, C.F_SETFL, C.fcntl(handle, C.F_GETFL) | C.O_NONBLOCK))!
}
} }
return s return s
} }
@ -430,7 +425,7 @@ fn (mut s StreamSocket) connect(socket_path string) ! {
alen := addr.len() alen := addr.len()
eprintln(addr) eprintln(addr)
$if !net_blocking_sockets ? { $if net_nonblocking_sockets ? {
res := $if is_coroutine ? { res := $if is_coroutine ? {
C.photon_connect(s.handle, voidptr(&addr), alen, unix.unix_default_read_timeout) C.photon_connect(s.handle, voidptr(&addr), alen, unix.unix_default_read_timeout)
} $else { } $else {
@ -487,13 +482,8 @@ pub fn stream_socket_from_handle(sockfd int) !&StreamSocket {
eprintln(' stream_socket_from_handle | s.handle: ${s.handle:6}') eprintln(' stream_socket_from_handle | s.handle: ${s.handle:6}')
} }
$if !net_blocking_sockets ? { $if net_nonblocking_sockets ? {
$if windows { net.set_blocking(sockfd, false)!
t := u32(1) // true
net.socket_error(C.ioctlsocket(sockfd, net.fionbio, &t))!
} $else {
net.socket_error(C.fcntl(sockfd, C.F_SETFL, C.fcntl(sockfd, C.F_GETFL) | C.O_NONBLOCK))!
}
} }
return s return s
} }