sync: use SpinLock for channel (fix #24680) (#24802)

This commit is contained in:
kbkpbot 2025-06-28 19:55:06 +08:00 committed by GitHub
parent 2cfeb6d07b
commit bd465b5254
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 118 additions and 47 deletions

View File

@ -0,0 +1,89 @@
// Usage: v run run_bench.v
// It will generate a `result.md` in current dir.
import os
import arrays
import strings
const compilers = [
'tcc',
'clang',
'gcc',
]
const run_iterations = 10
const nobj = 10000000
const run_settings = [
[1, 1, 0],
[1, 1, 100],
[4, 4, 0],
[4, 4, 100],
]
fn get_perf_from_result(result string) !f32 {
lines := result.split_into_lines()
for l in lines {
if l.contains('objects') && l.contains('(') && l.contains(')') {
f := l.find_between('(', ')').all_before('objs/µs').trim_space().f32()
return f
}
}
return error('run fail?')
}
fn main() {
mut perf_result := []f32{}
for cc in compilers {
// 1. compile
compile_cmd := 'v channel_bench_v.v -cc ${cc}'
println('compile_cmd: ${compile_cmd}')
compile_result := os.execute(compile_cmd)
if compile_result.exit_code != 0 {
panic('compile fail with "${compile_cmd}"')
}
// 2. run
for s in run_settings {
run_cmd := './channel_bench_v ${s[0]:-3} ${s[1]:-3} ${s[2]:-3} ${nobj}'
println('-----------------------------------------------------------')
mut iteration_result := []f32{}
for i in 0 .. run_iterations {
print('${i:3}: ${run_cmd}')
run_result := os.execute(run_cmd)
f := get_perf_from_result(run_result.output)!
iteration_result << f
println(' => ${f:.2} objs/µs')
}
avg := arrays.sum(iteration_result)! / run_iterations
perf_result << avg
}
}
// 3. output result
mut sb := strings.new_builder(8192)
sb.write_string('\n| nsend | nrec | buflen |')
for cc in compilers {
sb.write_string(' **V (${cc:-5})** |')
}
sb.writeln('')
sb.write_string('| :---: | :---:| :---: |')
for _ in 0 .. compilers.len {
sb.write_string(' :---: |')
}
sb.writeln('')
for i, s in run_settings {
sb.write_string('| ${s[0]:-3} | ${s[1]:-3} | ${s[2]:-3} |')
for j in 0 .. compilers.len {
sb.write_string(' ${perf_result[j * run_settings.len + i]:-5.2} |')
}
sb.writeln('')
}
sb.writeln('')
println('***********************************************************')
println('writing result to `result.md`...')
println('***********************************************************')
os.write_file('result.md', sb.str())!
println(sb.str())
os.rm('./channel_bench_v')!
}

View File

@ -50,8 +50,8 @@ mut:
// for select // for select
write_subscriber &Subscription = unsafe { nil } write_subscriber &Subscription = unsafe { nil }
read_subscriber &Subscription = unsafe { nil } read_subscriber &Subscription = unsafe { nil }
write_sub_mtx u16 write_sub_mtx &SpinLock
read_sub_mtx u16 read_sub_mtx &SpinLock
closed u16 closed u16
pub: pub:
cap u32 // queue length in #objects cap u32 // queue length in #objects
@ -80,6 +80,8 @@ fn new_channel_st(n u32, st u32) &Channel {
statusbuf: sbuf statusbuf: sbuf
write_subscriber: unsafe { nil } write_subscriber: unsafe { nil }
read_subscriber: unsafe { nil } read_subscriber: unsafe { nil }
write_sub_mtx: new_spin_lock()
read_sub_mtx: new_spin_lock()
} }
ch.writesem.init(wsem) ch.writesem.init(wsem)
ch.readsem.init(rsem) ch.readsem.init(rsem)
@ -103,6 +105,8 @@ fn new_channel_st_noscan(n u32, st u32) &Channel {
statusbuf: sbuf statusbuf: sbuf
write_subscriber: unsafe { nil } write_subscriber: unsafe { nil }
read_subscriber: unsafe { nil } read_subscriber: unsafe { nil }
write_sub_mtx: new_spin_lock()
read_sub_mtx: new_spin_lock()
} }
ch.writesem.init(wsem) ch.writesem.init(wsem)
ch.readsem.init(rsem) ch.readsem.init(rsem)
@ -130,27 +134,24 @@ pub fn (mut ch Channel) close() {
} }
ch.readsem_im.post() ch.readsem_im.post()
ch.readsem.post() ch.readsem.post()
mut null16 := u16(0) ch.read_sub_mtx.lock()
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.read_subscriber != unsafe { nil } { if ch.read_subscriber != unsafe { nil } {
ch.read_subscriber.sem.post() ch.read_subscriber.sem.post()
} }
C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) ch.read_sub_mtx.unlock()
null16 = u16(0) ch.write_sub_mtx.lock()
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.write_subscriber != unsafe { nil } { if ch.write_subscriber != unsafe { nil } {
ch.write_subscriber.sem.post() ch.write_subscriber.sem.post()
} }
C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) ch.write_sub_mtx.unlock()
ch.writesem.post() ch.writesem.post()
if ch.cap == 0 { if ch.cap == 0 {
C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, unsafe { nil }) C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, unsafe { nil })
} }
ch.writesem_im.post() ch.writesem_im.post()
// Do not destroy `read_sub_mtx` and `write_sub_mtx` here,
// because we can read from a closed channel later.
} }
@[inline] @[inline]
@ -236,15 +237,11 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
} }
} }
if !read_in_progress { if !read_in_progress {
mut null16 := u16(0) ch.read_sub_mtx.lock()
for !C.atomic_compare_exchange_weak_u16(voidptr(&ch.read_sub_mtx), &null16,
u16(1)) {
null16 = u16(0)
}
if ch.read_subscriber != unsafe { nil } { if ch.read_subscriber != unsafe { nil } {
ch.read_subscriber.sem.post() ch.read_subscriber.sem.post()
} }
C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) ch.read_sub_mtx.unlock()
} }
mut src2 := src mut src2 := src
for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ { for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ {
@ -335,14 +332,11 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written)) C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written))
C.atomic_fetch_add_u32(voidptr(&ch.read_avail), 1) C.atomic_fetch_add_u32(voidptr(&ch.read_avail), 1)
ch.readsem.post() ch.readsem.post()
mut null16 := u16(0) ch.read_sub_mtx.lock()
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.read_subscriber != unsafe { nil } { if ch.read_subscriber != unsafe { nil } {
ch.read_subscriber.sem.post() ch.read_subscriber.sem.post()
} }
C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) ch.read_sub_mtx.unlock()
return .success return .success
} else { } else {
if no_block { if no_block {
@ -456,14 +450,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused)) C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused))
C.atomic_fetch_add_u32(voidptr(&ch.write_free), 1) C.atomic_fetch_add_u32(voidptr(&ch.write_free), 1)
ch.writesem.post() ch.writesem.post()
mut null16 := u16(0) ch.write_sub_mtx.lock()
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.write_subscriber != unsafe { nil } { if ch.write_subscriber != unsafe { nil } {
ch.write_subscriber.sem.post() ch.write_subscriber.sem.post()
} }
C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) ch.write_sub_mtx.unlock()
return .success return .success
} }
} }
@ -484,14 +475,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
} }
} }
if ch.cap == 0 && !write_in_progress { if ch.cap == 0 && !write_in_progress {
mut null16 := u16(0) ch.write_sub_mtx.lock()
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.write_subscriber != unsafe { nil } { if ch.write_subscriber != unsafe { nil } {
ch.write_subscriber.sem.post() ch.write_subscriber.sem.post()
} }
C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) ch.write_sub_mtx.unlock()
} }
mut dest2 := dest mut dest2 := dest
for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ { for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ {
@ -556,14 +544,11 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
for i, ch in channels { for i, ch in channels {
subscr[i].sem = unsafe { &sem } subscr[i].sem = unsafe { &sem }
sub_mtx, subscriber := if dir[i] == .push { sub_mtx, subscriber := if dir[i] == .push {
&ch.write_sub_mtx, &ch.write_subscriber ch.write_sub_mtx, &ch.write_subscriber
} else { } else {
&ch.read_sub_mtx, &ch.read_subscriber ch.read_sub_mtx, &ch.read_subscriber
}
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) {
null16 = u16(0)
} }
sub_mtx.lock()
subscr[i].prev = unsafe { subscriber } subscr[i].prev = unsafe { subscriber }
unsafe { unsafe {
subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(subscriber), subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(subscriber),
@ -572,7 +557,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
if voidptr(subscr[i].nxt) != unsafe { nil } { if voidptr(subscr[i].nxt) != unsafe { nil } {
subscr[i].nxt.prev = unsafe { &subscr[i].nxt } subscr[i].nxt.prev = unsafe { &subscr[i].nxt }
} }
C.atomic_store_u16(sub_mtx, u16(0)) sub_mtx.unlock()
} }
stopwatch := if timeout == time.infinite || timeout <= 0 { stopwatch := if timeout == time.infinite || timeout <= 0 {
time.StopWatch{} time.StopWatch{}
@ -620,14 +605,11 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
// reset subscribers // reset subscribers
for i, ch in channels { for i, ch in channels {
sub_mtx := if dir[i] == .push { sub_mtx := if dir[i] == .push {
&ch.write_sub_mtx ch.write_sub_mtx
} else { } else {
&ch.read_sub_mtx ch.read_sub_mtx
}
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) {
null16 = u16(0)
} }
sub_mtx.lock()
unsafe { unsafe {
*subscr[i].prev = subscr[i].nxt *subscr[i].prev = subscr[i].nxt
} }
@ -635,7 +617,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
subscr[i].nxt.prev = subscr[i].prev subscr[i].nxt.prev = subscr[i].prev
subscr[i].nxt.sem.post() subscr[i].nxt.sem.post()
} }
C.atomic_store_u16(sub_mtx, u16(0)) sub_mtx.unlock()
} }
sem.destroy() sem.destroy()
return event_idx return event_idx