v/vlib/net/unix/stream.c.v

510 lines
14 KiB
V

module unix
import time
import os
import net
const unix_default_read_timeout = 30 * time.second
const unix_default_write_timeout = 30 * time.second
const connect_timeout = 5 * time.second
const msg_nosignal = 0x4000
// UnixDialer is a concrete instance of the Dialer interface,
// for creating unix socket connections.
pub struct UnixDialer {}
// dial will try to create a new abstract connection to the given address.
// It will return an error, if that is not possible.
pub fn (u UnixDialer) dial(address string) !net.Connection {
return connect_stream(address)!
}
@[heap]
pub struct StreamConn {
pub mut:
sock StreamSocket
mut:
handle int
write_deadline time.Time
read_deadline time.Time
read_timeout time.Duration
write_timeout time.Duration
is_blocking bool
}
// connect_stream returns a SOCK_STREAM connection for an unix domain socket on `socket_path`
pub fn connect_stream(socket_path string) !&StreamConn {
if socket_path.len >= max_sun_path {
return error('Socket path too long! Max length: ${max_sun_path - 1} chars.')
}
mut s := new_stream_socket(socket_path) or {
return error('${err.msg()}; could not create new unix socket')
}
s.connect(socket_path)!
return &StreamConn{
sock: s
read_timeout: unix_default_read_timeout
write_timeout: unix_default_write_timeout
}
}
// addr returns the local address of the stream
pub fn (c StreamConn) addr() !net.Addr {
return error('not implemented for unix connections')
}
// peer_addr returns the address of the remote peer of the stream
pub fn (c StreamConn) peer_addr() !net.Addr {
return error('not implemented for unix connections')
}
// close closes the connection
pub fn (mut c StreamConn) close() ! {
$if trace_unix ? {
eprintln(' StreamConn.close | c.sock.handle: ${c.sock.handle:6}')
}
c.sock.close()!
}
// write_ptr blocks and attempts to write all data
pub fn (mut c StreamConn) write_ptr(b &u8, len int) !int {
$if trace_unix_sock_handle ? {
eprintln('>>> StreamConn.write_ptr | c: ${ptr_str(c)} | c.sock.handle: ${c.sock.handle} | b: ${ptr_str(b)} | len: ${len}')
}
$if trace_unix ? {
eprintln(
'>>> StreamConn.write_ptr | c.sock.handle: ${c.sock.handle} | b: ${ptr_str(b)} len: ${len} |\n' +
unsafe { b.vstring_with_len(len) })
}
$if trace_unix_data_write ? {
eprintln(
'>>> StreamConn.write_ptr | data.len: ${len:6} | hex: ${unsafe { b.vbytes(len) }.hex()} | data: ' +
unsafe { b.vstring_with_len(len) })
}
unsafe {
mut ptr_base := &u8(b)
mut total_sent := 0
for total_sent < len {
ptr := ptr_base + total_sent
remaining := len - total_sent
mut sent := $if is_coroutine ? {
C.photon_send(c.sock.handle, ptr, remaining, net.msg_nosignal, c.write_timeout)
} $else {
C.send(c.sock.handle, ptr, remaining, net.msg_nosignal)
}
$if trace_unix_data_write ? {
eprintln('>>> UnixConn.write_ptr | data chunk, total_sent: ${total_sent:6}, remaining: ${remaining:6}, ptr: ${voidptr(ptr):x} => sent: ${sent:6}')
}
if sent < 0 {
code := net.error_code()
if code == int(net.error_ewouldblock) {
c.wait_for_write()!
continue
} else {
net.wrap_error(code)!
}
}
total_sent += sent
}
return total_sent
}
}
// write blocks and attempts to write all data
pub fn (mut c StreamConn) write(bytes []u8) !int {
return c.write_ptr(bytes.data, bytes.len)
}
// write_string blocks and attempts to write all data
pub fn (mut c StreamConn) write_string(s string) !int {
return c.write_ptr(s.str, s.len)
}
// read_ptr attempts to write all data
pub fn (mut c StreamConn) read_ptr(buf_ptr &u8, len int) !int {
mut res := $if is_coroutine ? {
wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))!
} $else {
wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
}
$if trace_unix ? {
eprintln('<<< StreamConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
}
if res > 0 {
$if trace_unix_data_read ? {
eprintln(
'<<< StreamConn.read_ptr | 1 data.len: ${res:6} | hex: ${unsafe { buf_ptr.vbytes(res) }.hex()} | data: ' +
unsafe { buf_ptr.vstring_with_len(res) })
}
return res
}
code := net.error_code()
if code == int(net.error_ewouldblock) {
c.wait_for_read()!
res = $if is_coroutine ? {
wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))!
} $else {
wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
}
$if trace_unix ? {
eprintln('<<< StreamConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
}
$if trace_unix_data_read ? {
if res > 0 {
eprintln(
'<<< StreamConn.read_ptr | 2 data.len: ${res:6} | hex: ${unsafe { buf_ptr.vbytes(res) }.hex()} | data: ' +
unsafe { buf_ptr.vstring_with_len(res) })
}
}
return net.socket_error(res)
} else {
net.wrap_error(code)!
}
return error('none')
}
// read data into `buf`
pub fn (mut c StreamConn) read(mut buf []u8) !int {
return c.read_ptr(buf.data, buf.len)
}
// read_deadline returns the read deadline
pub fn (mut c StreamConn) read_deadline() !time.Time {
if c.read_deadline.unix() == 0 {
return c.read_deadline
}
return error('none')
}
// set_read_deadlien sets the read deadline
pub fn (mut c StreamConn) set_read_deadline(deadline time.Time) {
c.read_deadline = deadline
}
// write_deadline returns the write deadline
pub fn (mut c StreamConn) write_deadline() !time.Time {
if c.write_deadline.unix() == 0 {
return c.write_deadline
}
return error('none')
}
// set_write_deadline sets the write deadline
pub fn (mut c StreamConn) set_write_deadline(deadline time.Time) {
c.write_deadline = deadline
}
// read_timeout returns the read timeout
pub fn (c &StreamConn) read_timeout() time.Duration {
return c.read_timeout
}
// set_read_timeout sets the read timeout
pub fn (mut c StreamConn) set_read_timeout(t time.Duration) {
c.read_timeout = t
}
// write_timeout returns the write timeout
pub fn (c &StreamConn) write_timeout() time.Duration {
return c.write_timeout
}
// set_write_timeout sets the write timeout
pub fn (mut c StreamConn) set_write_timeout(t time.Duration) {
c.write_timeout = t
}
// wait_for_read blocks until the socket is ready to read
@[inline]
pub fn (mut c StreamConn) wait_for_read() ! {
return wait_for_read(c.sock.handle, c.read_deadline, c.read_timeout)
}
// wait_for_read blocks until the socket is ready to write
@[inline]
pub fn (mut c StreamConn) wait_for_write() ! {
return wait_for_write(c.sock.handle, c.write_deadline, c.write_timeout)
}
// str returns a string representation of connection `c`
pub fn (c StreamConn) str() string {
s := c.sock.str().replace('\n', ' ').replace(' ', ' ')
return 'StreamConn{ write_deadline: ${c.write_deadline}, read_deadline: ${c.read_deadline}, read_timeout: ${c.read_timeout}, write_timeout: ${c.write_timeout}, sock: ${s} }'
}
pub struct StreamListener {
pub mut:
sock StreamSocket
mut:
accept_timeout time.Duration
accept_deadline time.Time
}
@[params]
pub struct ListenOptions {
pub:
backlog int = 128
}
// listen_stream creates an unix domain socket at `socket_path`
pub fn listen_stream(socket_path string, options ListenOptions) !&StreamListener {
if socket_path.len >= max_sun_path {
return error('Socket path too long! Max length: ${max_sun_path - 1} chars.')
}
mut s := new_stream_socket(socket_path) or {
return error('${err.msg()}; could not create new unix stream socket')
}
addrs := net.resolve_addrs(socket_path, .unix, .tcp) or {
return error('${err.msg()}; could not resolve path ${socket_path}')
}
addr := addrs[0]
// cast to the correct type
alen := addr.len()
// try to unlink/remove an existing filesystem object at `socket_path`. Ignore errors,
// because it's ok if the path doesn't exists and if it exists, but can't be unlinked
// then `bind` will generate an error
$if windows {
os.rm(socket_path) or {}
} $else {
C.unlink(&char(socket_path.str))
}
net.socket_error_message(C.bind(s.handle, voidptr(&addr), alen), 'binding to ${socket_path} failed')!
net.socket_error_message(C.listen(s.handle, options.backlog), 'listening on ${socket_path} with maximum backlog pending queue of ${options.backlog}, failed')!
return &StreamListener{
sock: s
accept_deadline: no_deadline
accept_timeout: infinite_timeout
}
}
// accept accepts blocks until a new connection occurs
pub fn (mut l StreamListener) accept() !&StreamConn {
$if trace_unix ? {
eprintln(' StreamListener.accept | l.sock.handle: ${l.sock.handle:6}')
}
mut new_handle := $if is_coroutine ? {
C.photon_accept(l.sock.handle, 0, 0, unix_default_read_timeout)
} $else {
C.accept(l.sock.handle, 0, 0)
}
if new_handle <= 0 {
l.wait_for_accept()!
new_handle = $if is_coroutine ? {
C.photon_accept(l.sock.handle, 0, 0, unix_default_read_timeout)
} $else {
C.accept(l.sock.handle, 0, 0)
}
if new_handle == -1 || new_handle == 0 {
return error('accept failed')
}
}
mut c := &StreamConn{
handle: new_handle
read_timeout: unix_default_read_timeout
write_timeout: unix_default_write_timeout
}
c.sock = stream_socket_from_handle(c.handle)!
return c
}
// accept_deadline returns the deadline until a new client is accepted
pub fn (l &StreamListener) accept_deadline() !time.Time {
if l.accept_deadline.unix() != 0 {
return l.accept_deadline
}
return error('no deadline')
}
// set_accept_deadline sets the deadlinme until a new client is accepted
pub fn (mut l StreamListener) set_accept_deadline(deadline time.Time) {
l.accept_deadline = deadline
}
// accept_timeout returns the timeout until a new client is accepted
pub fn (l &StreamListener) accept_timeout() time.Duration {
return l.accept_timeout
}
// set_accept_timeout sets the timeout until a new client is accepted
pub fn (mut l StreamListener) set_accept_timeout(t time.Duration) {
l.accept_timeout = t
}
// wait_for_accept blocks until a client can be accepted
pub fn (mut l StreamListener) wait_for_accept() ! {
return wait_for_read(l.sock.handle, l.accept_deadline, l.accept_timeout)
}
// close closes the listening socket and unlinks/removes the socket file
pub fn (mut l StreamListener) close() ! {
l.sock.close()!
l.unlink()!
}
// unlink removes the unix socket from the file system
pub fn (mut l StreamListener) unlink() ! {
$if windows {
os.rm(l.sock.socket_path)!
} $else {
net.socket_error_message(C.unlink(&char(l.sock.socket_path.str)), 'could not unlink ${l.sock.socket_path}')!
}
}
// unlink_on_signal removes the socket from the filesystem when signal `signum` occurs
pub fn (mut l StreamListener) unlink_on_signal(signum os.Signal) ! {
os.signal_opt(.int, fn [mut l] (sign os.Signal) {
$if trace_unix ? {
eprintln(' StreamListener.unlink_on_signal received signal ${sign}; unlinking unix socket ${l.sock.socket_path}')
}
l.unlink() or {}
exit(1)
})!
}
// addr returns the `net.Addr` version of the listening socket's path
pub fn (mut l StreamListener) addr() !net.Addr {
return l.sock.address()!
}
pub struct StreamSocket {
net.Socket
mut:
socket_path string
}
fn new_stream_socket(socket_path string) !StreamSocket {
handle := $if is_coroutine ? {
net.socket_error(C.photon_socket(.unix, .tcp, 0))!
} $else {
net.socket_error(C.socket(.unix, .tcp, 0))!
}
mut s := StreamSocket{
handle: handle
socket_path: socket_path
}
$if trace_unix ? {
eprintln(' new_unix_socket | s.handle: ${s.handle:6}')
}
$if net_nonblocking_sockets ? {
net.set_blocking(handle, false)!
}
return s
}
fn (mut s StreamSocket) close() ! {
// shutdown might be redundant for unix domain sockets, but it doesn't hurt to call it
shutdown(s.handle)
return close(s.handle)
}
fn (mut s StreamSocket) select(test Select, timeout time.Duration) !bool {
return select(s.handle, test, timeout)
}
// set_option sets an option on the socket
fn (mut s StreamSocket) set_option(level int, opt int, value int) ! {
net.socket_error(C.setsockopt(s.handle, level, opt, &value, sizeof(int)))!
}
// set_option_bool sets a boolean option on the socket
pub fn (mut s StreamSocket) set_option_bool(opt net.SocketOption, value bool) ! {
if opt !in net.opts_can_set {
return net.err_option_not_settable
}
if opt !in net.opts_bool {
return net.err_option_wrong_type
}
x := int(value)
s.set_option(C.SOL_SOCKET, int(opt), x)!
}
// set_option_bool sets an int option on the socket
pub fn (mut s StreamSocket) set_option_int(opt net.SocketOption, value int) ! {
s.set_option(C.SOL_SOCKET, int(opt), value)!
}
fn (mut s StreamSocket) connect(socket_path string) ! {
if socket_path.len >= max_sun_path {
return error('Socket path too long! Max length: ${max_sun_path - 1} chars.')
}
addrs := net.resolve_addrs(socket_path, .unix, .tcp) or {
return error('${err.msg()}; could not resolve path ${socket_path}')
}
addr := addrs[0]
// cast to the correct type
alen := addr.len()
$if net_nonblocking_sockets ? {
res := $if is_coroutine ? {
C.photon_connect(s.handle, voidptr(&addr), alen, unix_default_read_timeout)
} $else {
C.connect(s.handle, voidptr(&addr), alen)
}
if res == 0 {
return
}
ecode := net.error_code()
// no need to check for einprogress on nix
// On windows we expect res == -1 && net.error_code() == ewouldblock
$if windows {
if ecode == int(net.error_ewouldblock) {
// The socket is nonblocking and the connection cannot be completed
// immediately. Wait till the socket is ready to write
write_result := s.select(.write, connect_timeout)!
err := 0
len := sizeof(err)
// determine whether connect() completed successfully (SO_ERROR is zero)
xyz := C.getsockopt(s.handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len)
if xyz == 0 && err == 0 {
return
}
if write_result {
if xyz == 0 {
net.wrap_error(err)!
return
}
return
}
return net.err_timed_out
}
}
net.wrap_error(ecode)!
return
} $else {
x := $if is_coroutine ? {
C.photon_connect(s.handle, voidptr(&addr), alen, unix_default_read_timeout)
} $else {
C.connect(s.handle, voidptr(&addr), alen)
}
net.socket_error(x)!
}
}
// stream_socket_from_handle returns a `StreamSocket` instance from the raw file descriptor `sockfd`
pub fn stream_socket_from_handle(sockfd int) !&StreamSocket {
mut s := &StreamSocket{
handle: sockfd
}
$if trace_unix ? {
eprintln(' stream_socket_from_handle | s.handle: ${s.handle:6}')
}
$if net_nonblocking_sockets ? {
net.set_blocking(sockfd, false)!
}
return s
}