coroutines: use photon work_pool when nr_jobs > 0, and use photon libc fn wrappers (#19711)

This commit is contained in:
Joe C 2023-11-01 04:12:52 +11:00 committed by GitHub
parent 57a7db11bf
commit a63f3e6f77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 122 additions and 12 deletions

View File

@ -188,6 +188,7 @@ pub fn new_test_session(_vargs string, will_compile bool) TestSession {
// is only available on macos for now, and it is not yet trivial enough to
// build/install on the CI:
skip_files << 'examples/coroutines/simple_coroutines.v'
skip_files << 'examples/coroutines/coroutines_bench.v'
$if msvc {
skip_files << 'vlib/v/tests/const_comptime_eval_before_vinit_test.v' // _constructor used
skip_files << 'vlib/v/tests/project_with_cpp_code/compiling_cpp_files_with_a_cplusplus_compiler_test.v'

View File

@ -0,0 +1,37 @@
// Build with (-gc none, until GC bug is fixed)
// v -gc none -use-coroutines coroutine_benchs.v
//
import coroutines
import time
import net.http
import sync
const run_time = 10 * time.second
fn request(mut mu sync.Mutex, count &int) {
for {
http.get('http://vlang.io/utc_now') or { panic(err) }
mu.@lock()
unsafe {
(*count)++
}
mu.unlock()
}
}
fn main() {
mut mu := sync.new_mutex()
mut count := 0
for _ in 0 .. 8 {
go request(mut mu, &count)
}
$if is_coroutine ? {
println('IS COROUTINE=true')
coroutines.sleep(run_time)
} $else {
println('IS COROUTINE=false')
time.sleep(run_time)
}
println('${count} requests made.')
}

View File

@ -1,6 +1,8 @@
#ifndef C_PHOTONWRAPPER_H_
#define C_PHOTONWRAPPER_H_
#include <sys/socket.h>
#ifdef __cplusplus
@ -12,23 +14,42 @@
#include <photon/common/iovector.h>
#include <photon/fs/localfs.h>
#include <photon/net/socket.h>
#include <photon/net/basic_socket.h>
#include <photon/thread/workerpool.h>
#include <iostream>
extern "C" {
// using namespace photon;
// WorkPool* work_pool;
// WorkPool* new_photon_work_pool();
photon::WorkPool* work_pool;
#else
#endif
// custom v functions
void init_photon_work_pool(size_t);
void photon_thread_create_and_migrate_to_work_pool(void* (* f)(void*), void* arg);
// direct wrappers to photon functions
int photon_init_default();
void photon_thread_create(void* (* f)(void*), void* arg);
void photon_sleep_s(int n);
void photon_sleep_ms(int n);
// void* default_photon_thread_stack_alloc(void*, size_t size);
// void default_photon_thread_stack_dealloc(void*, void* ptr, size_t size);
void set_photon_thread_stack_allocator(
void* (*alloc_func)(void*, size_t),
void (*dealloc_func)(void*, void*, size_t)
);
int photon_socket(int domain, int type, int protocol);
int photon_connect(int fd, const struct sockaddr *addr, socklen_t addrlen, uint64_t timeout);
int photon_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, uint64_t timeout);
ssize_t photon_send(int fd, const void* buf, size_t len, int flags, uint64_t timeout);
// ssize_t photon_sendmsg(int fd, const struct msghdr* msg, int flags, uint64_t timeout);
ssize_t photon_recv(int fd, void* buf, size_t count, int flags, uint64_t timeout);
// ssize_t photon_recvmsg(int fd, struct msghdr* msg, int flags, uint64_t timeout);
#ifdef __cplusplus
}
#endif

View File

@ -3,6 +3,7 @@
// that can be found in the LICENSE file.
module coroutines
import v.util
import time
#flag -I @VEXEROOT/thirdparty/photon
@ -10,8 +11,13 @@ import time
#include "photonwrapper.h"
fn C.photon_init_default() int
// struct C.WorkPool {}
// fn C.new_photon_work_pool) C.WorkPool
fn C.init_photon_work_pool(int)
// fn C.photon_thread_create_and_migrate_to_work_pool(f voidptr, arg voidptr)
fn C.photon_thread_create(f voidptr, arg voidptr)
fn C.photon_init_default() int
fn C.photon_sleep_s(n int)
fn C.photon_sleep_ms(n int)
fn C.set_photon_thread_stack_allocator(fn (voidptr, int) voidptr, fn (voidptr, voidptr, int))
@ -41,6 +47,9 @@ fn init() {
}
C.set_photon_thread_stack_allocator(alloc, dealloc)
ret := C.photon_init_default()
if util.nr_jobs > 0 {
C.init_photon_work_pool(util.nr_jobs)
}
if ret < 0 {
panic('failed to initialize coroutines via photon (ret=${ret})')
}

View File

@ -113,5 +113,11 @@ fn C.FD_ISSET(fd int, fdset &C.fd_set) int
fn C.inet_pton(family AddrFamily, saddr &char, addr voidptr) int
fn C.photon_socket(domain AddrFamily, typ SocketType, protocol int) int
fn C.photon_connect(int, &Addr, u32, timeout u64) int
fn C.photon_accept(int, voidptr, int, timeout u64) int
fn C.photon_send(int, voidptr, int, int, timeout u64) int
fn C.photon_recv(int, voidptr, int, int, timeout u64) int
[typedef]
pub struct C.fd_set {}

View File

@ -112,7 +112,11 @@ pub fn (mut c TcpConn) close() ! {
}
pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int {
mut res := wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
mut res := $if is_coroutine ? {
wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))!
} $else {
wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
}
$if trace_tcp ? {
eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
}
@ -127,7 +131,11 @@ pub fn (c TcpConn) read_ptr(buf_ptr &u8, len int) !int {
code := error_code()
if code == int(error_ewouldblock) {
c.wait_for_read()!
res = wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
res = $if is_coroutine ? {
wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))!
} $else {
wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))!
}
$if trace_tcp ? {
eprintln('<<< TcpConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}')
}
@ -177,7 +185,11 @@ pub fn (mut c TcpConn) write_ptr(b &u8, len int) !int {
for total_sent < len {
ptr := ptr_base + total_sent
remaining := len - total_sent
mut sent := C.send(c.sock.handle, ptr, remaining, msg_nosignal)
mut sent := $if is_coroutine ? {
C.photon_send(c.sock.handle, ptr, remaining, msg_nosignal, c.write_timeout)
} $else {
C.send(c.sock.handle, ptr, remaining, msg_nosignal)
}
$if trace_tcp_data_write ? {
eprintln('>>> TcpConn.write_ptr | data chunk, total_sent: ${total_sent:6}, remaining: ${remaining:6}, ptr: ${voidptr(ptr):x} => sent: ${sent:6}')
}
@ -337,10 +349,18 @@ pub fn (mut l TcpListener) accept_only() !&TcpConn {
eprintln(' TcpListener.accept | l.sock.handle: ${l.sock.handle:6}')
}
mut new_handle := C.accept(l.sock.handle, 0, 0)
mut new_handle := $if is_coroutine ? {
C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout)
} $else {
C.accept(l.sock.handle, 0, 0)
}
if new_handle <= 0 {
l.wait_for_accept()!
new_handle = C.accept(l.sock.handle, 0, 0)
new_handle = $if is_coroutine ? {
C.photon_accept(l.sock.handle, 0, 0, net.tcp_default_read_timeout)
} $else {
C.accept(l.sock.handle, 0, 0)
}
if new_handle == -1 || new_handle == 0 {
return error('accept failed')
}
@ -389,7 +409,11 @@ struct TcpSocket {
}
fn new_tcp_socket(family AddrFamily) !TcpSocket {
handle := socket_error(C.socket(family, SocketType.tcp, 0))!
handle := $if is_coroutine ? {
socket_error(C.photon_socket(family, SocketType.tcp, 0))!
} $else {
socket_error(C.socket(family, SocketType.tcp, 0))!
}
mut s := TcpSocket{
handle: handle
}
@ -519,7 +543,11 @@ const (
fn (mut s TcpSocket) connect(a Addr) ! {
$if !net_blocking_sockets ? {
res := C.connect(s.handle, voidptr(&a), a.len())
res := $if is_coroutine ? {
C.photon_connect(s.handle, voidptr(&a), a.len(), net.tcp_default_read_timeout)
} $else {
C.connect(s.handle, voidptr(&a), a.len())
}
if res == 0 {
return
}
@ -555,7 +583,11 @@ fn (mut s TcpSocket) connect(a Addr) ! {
wrap_error(ecode)!
return
} $else {
x := C.connect(s.handle, voidptr(&a), a.len())
x := $if is_coroutine ? {
C.photon_connect(s.handle, voidptr(&a), a.len(), net.tcp_default_read_timeout)
} $else {
C.connect(s.handle, voidptr(&a), a.len())
}
socket_error(x)!
}
}

View File

@ -161,7 +161,11 @@ fn (mut g Gen) spawn_and_go_expr(node ast.SpawnExpr, mode SpawnGoMode) {
}
}
} else if is_go {
g.writeln('photon_thread_create((void*)${wrapper_fn_name}, &${arg_tmp_var});')
if util.nr_jobs > 0 {
g.writeln('photon_thread_create_and_migrate_to_work_pool((void*)${wrapper_fn_name}, &${arg_tmp_var});')
} else {
g.writeln('photon_thread_create((void*)${wrapper_fn_name}, &${arg_tmp_var});')
}
}
g.writeln('// end go')
if node.is_expr {