diff --git a/cmd/tools/modules/testing/common.v b/cmd/tools/modules/testing/common.v index 772e2a7756..0ffa3b3599 100644 --- a/cmd/tools/modules/testing/common.v +++ b/cmd/tools/modules/testing/common.v @@ -188,6 +188,7 @@ pub fn new_test_session(_vargs string, will_compile bool) TestSession { // is only available on macos for now, and it is not yet trivial enough to // build/install on the CI: skip_files << 'examples/coroutines/simple_coroutines.v' + skip_files << 'examples/coroutines/coroutines_bench.v' $if msvc { skip_files << 'vlib/v/tests/const_comptime_eval_before_vinit_test.v' // _constructor used skip_files << 'vlib/v/tests/project_with_cpp_code/compiling_cpp_files_with_a_cplusplus_compiler_test.v' diff --git a/examples/coroutines/coroutines_bench.v b/examples/coroutines/coroutines_bench.v new file mode 100644 index 0000000000..1b489a603b --- /dev/null +++ b/examples/coroutines/coroutines_bench.v @@ -0,0 +1,37 @@ +// Build with (-gc none, until GC bug is fixed) +// v -gc none -use-coroutines coroutine_benchs.v +// +import coroutines +import time +import net.http +import sync + +const run_time = 10 * time.second + +fn request(mut mu sync.Mutex, count &int) { + for { + http.get('http://vlang.io/utc_now') or { panic(err) } + mu.@lock() + unsafe { + (*count)++ + } + mu.unlock() + } +} + +fn main() { + mut mu := sync.new_mutex() + mut count := 0 + + for _ in 0 .. 8 { + go request(mut mu, &count) + } + $if is_coroutine ? { + println('IS COROUTINE=true') + coroutines.sleep(run_time) + } $else { + println('IS COROUTINE=false') + time.sleep(run_time) + } + println('${count} requests made.') +} diff --git a/thirdparty/photon/photonwrapper.h b/thirdparty/photon/photonwrapper.h index 97833b01e3..347fc6e99d 100644 --- a/thirdparty/photon/photonwrapper.h +++ b/thirdparty/photon/photonwrapper.h @@ -1,6 +1,8 @@ #ifndef C_PHOTONWRAPPER_H_ #define C_PHOTONWRAPPER_H_ +#include + #ifdef __cplusplus @@ -12,23 +14,42 @@ #include #include #include - +#include +#include #include extern "C" { +// using namespace photon; +// WorkPool* work_pool; +// WorkPool* new_photon_work_pool(); +photon::WorkPool* work_pool; #else - #endif +// custom v functions +void init_photon_work_pool(size_t); +void photon_thread_create_and_migrate_to_work_pool(void* (* f)(void*), void* arg); +// direct wrappers to photon functions int photon_init_default(); void photon_thread_create(void* (* f)(void*), void* arg); void photon_sleep_s(int n); void photon_sleep_ms(int n); + +// void* default_photon_thread_stack_alloc(void*, size_t size); +// void default_photon_thread_stack_dealloc(void*, void* ptr, size_t size); void set_photon_thread_stack_allocator( void* (*alloc_func)(void*, size_t), void (*dealloc_func)(void*, void*, size_t) ); +int photon_socket(int domain, int type, int protocol); +int photon_connect(int fd, const struct sockaddr *addr, socklen_t addrlen, uint64_t timeout); +int photon_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, uint64_t timeout); +ssize_t photon_send(int fd, const void* buf, size_t len, int flags, uint64_t timeout); +// ssize_t photon_sendmsg(int fd, const struct msghdr* msg, int flags, uint64_t timeout); +ssize_t photon_recv(int fd, void* buf, size_t count, int flags, uint64_t timeout); +// ssize_t photon_recvmsg(int fd, struct msghdr* msg, int flags, uint64_t timeout); + #ifdef __cplusplus } #endif diff --git a/vlib/coroutines/coroutines.v b/vlib/coroutines/coroutines.v index e0a7017f61..f0f340a238 100644 --- a/vlib/coroutines/coroutines.v +++ b/vlib/coroutines/coroutines.v @@ -3,6 +3,7 @@ // that can be found in the LICENSE file. module coroutines +import v.util import time #flag -I @VEXEROOT/thirdparty/photon @@ -10,8 +11,13 @@ import time #include "photonwrapper.h" -fn C.photon_init_default() int +// struct C.WorkPool {} +// fn C.new_photon_work_pool) C.WorkPool +fn C.init_photon_work_pool(int) + +// fn C.photon_thread_create_and_migrate_to_work_pool(f voidptr, arg voidptr) fn C.photon_thread_create(f voidptr, arg voidptr) +fn C.photon_init_default() int fn C.photon_sleep_s(n int) fn C.photon_sleep_ms(n int) fn C.set_photon_thread_stack_allocator(fn (voidptr, int) voidptr, fn (voidptr, voidptr, int)) @@ -41,6 +47,9 @@ fn init() { } C.set_photon_thread_stack_allocator(alloc, dealloc) ret := C.photon_init_default() + if util.nr_jobs > 0 { + C.init_photon_work_pool(util.nr_jobs) + } if ret < 0 { panic('failed to initialize coroutines via photon (ret=${ret})') } diff --git a/vlib/net/aasocket.c.v b/vlib/net/aasocket.c.v index 19063e1a57..083e3221cd 100644 --- a/vlib/net/aasocket.c.v +++ b/vlib/net/aasocket.c.v @@ -113,5 +113,11 @@ fn C.FD_ISSET(fd int, fdset &C.fd_set) int fn C.inet_pton(family AddrFamily, saddr &char, addr voidptr) int +fn C.photon_socket(domain AddrFamily, typ SocketType, protocol int) int +fn C.photon_connect(int, &Addr, u32, timeout u64) int +fn C.photon_accept(int, voidptr, int, timeout u64) int +fn C.photon_send(int, voidptr, int, int, timeout u64) int +fn C.photon_recv(int, voidptr, int, int, timeout u64) int + [typedef] pub struct C.fd_set {} diff --git a/vlib/net/tcp.v b/vlib/net/tcp.v index 91340d1ce1..08186badac 100644 --- a/vlib/net/tcp.v +++ b/vlib/net/tcp.v @@ -112,7 +112,11 @@ pub fn (mut c TcpConn) close() ! { } pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int { - mut res := wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! + mut res := $if is_coroutine ? { + wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))! + } $else { + wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! + } $if trace_tcp ? { eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') } @@ -127,7 +131,11 @@ pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int { code := error_code() if code == int(error_ewouldblock) { c.wait_for_read()! - res = wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! + res = $if is_coroutine ? { + wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))! + } $else { + wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! + } $if trace_tcp ? { eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') } @@ -177,7 +185,11 @@ pub fn (mut c TcpConn) write_ptr(b &u8, len int) !int { for total_sent < len { ptr := ptr_base + total_sent remaining := len - total_sent - mut sent := C.send(c.sock.handle, ptr, remaining, msg_nosignal) + mut sent := $if is_coroutine ? { + C.photon_send(c.sock.handle, ptr, remaining, msg_nosignal, c.write_timeout) + } $else { + C.send(c.sock.handle, ptr, remaining, msg_nosignal) + } $if trace_tcp_data_write ? { eprintln('>>> TcpConn.write_ptr | data chunk, total_sent: ${total_sent:6}, remaining: ${remaining:6}, ptr: ${voidptr(ptr):x} => sent: ${sent:6}') } @@ -337,10 +349,18 @@ pub fn (mut l TcpListener) accept_only() !&TcpConn { eprintln(' TcpListener.accept | l.sock.handle: ${l.sock.handle:6}') } - mut new_handle := C.accept(l.sock.handle, 0, 0) + mut new_handle := $if is_coroutine ? { + C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout) + } $else { + C.accept(l.sock.handle, 0, 0) + } if new_handle <= 0 { l.wait_for_accept()! - new_handle = C.accept(l.sock.handle, 0, 0) + new_handle = $if is_coroutine ? { + C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout) + } $else { + C.accept(l.sock.handle, 0, 0) + } if new_handle == -1 || new_handle == 0 { return error('accept failed') } @@ -389,7 +409,11 @@ struct TcpSocket { } fn new_tcp_socket(family AddrFamily) !TcpSocket { - handle := socket_error(C.socket(family, SocketType.tcp, 0))! + handle := $if is_coroutine ? { + socket_error(C.photon_socket(family, SocketType.tcp, 0))! + } $else { + socket_error(C.socket(family, SocketType.tcp, 0))! + } mut s := TcpSocket{ handle: handle } @@ -519,7 +543,11 @@ const ( fn (mut s TcpSocket) connect(a Addr) ! { $if !net_blocking_sockets ? { - res := C.connect(s.handle, voidptr(&a), a.len()) + res := $if is_coroutine ? { + C.photon_connect(s.handle, voidptr(&a), a.len(), net.tcp_default_read_timeout) + } $else { + C.connect(s.handle, voidptr(&a), a.len()) + } if res == 0 { return } @@ -555,7 +583,11 @@ fn (mut s TcpSocket) connect(a Addr) ! { wrap_error(ecode)! return } $else { - x := C.connect(s.handle, voidptr(&a), a.len()) + x := $if is_coroutine ? { + C.photon_connect(s.handle, voidptr(&a), a.len(), net.tcp_default_read_timeout) + } $else { + C.connect(s.handle, voidptr(&a), a.len()) + } socket_error(x)! } } diff --git a/vlib/v/gen/c/spawn_and_go.v b/vlib/v/gen/c/spawn_and_go.v index a1de7fe779..5aff5be0fd 100644 --- a/vlib/v/gen/c/spawn_and_go.v +++ b/vlib/v/gen/c/spawn_and_go.v @@ -161,7 +161,11 @@ fn (mut g Gen) spawn_and_go_expr(node ast.SpawnExpr, mode SpawnGoMode) { } } } else if is_go { - g.writeln('photon_thread_create((void*)${wrapper_fn_name}, &${arg_tmp_var});') + if util.nr_jobs > 0 { + g.writeln('photon_thread_create_and_migrate_to_work_pool((void*)${wrapper_fn_name}, &${arg_tmp_var});') + } else { + g.writeln('photon_thread_create((void*)${wrapper_fn_name}, &${arg_tmp_var});') + } } g.writeln('// end go') if node.is_expr {