diff --git a/vlib/sync/bench/run_bench.v b/vlib/sync/bench/run_bench.v new file mode 100644 index 0000000000..2c58dc7d42 --- /dev/null +++ b/vlib/sync/bench/run_bench.v @@ -0,0 +1,89 @@ +// Usage: v run run_bench.v +// It will generate a `result.md` in current dir. +import os +import arrays +import strings + +const compilers = [ + 'tcc', + 'clang', + 'gcc', +] + +const run_iterations = 10 + +const nobj = 10000000 +const run_settings = [ + [1, 1, 0], + [1, 1, 100], + [4, 4, 0], + [4, 4, 100], +] + +fn get_perf_from_result(result string) !f32 { + lines := result.split_into_lines() + for l in lines { + if l.contains('objects') && l.contains('(') && l.contains(')') { + f := l.find_between('(', ')').all_before('objs/µs').trim_space().f32() + return f + } + } + return error('run fail?') +} + +fn main() { + mut perf_result := []f32{} + + for cc in compilers { + // 1. compile + compile_cmd := 'v channel_bench_v.v -cc ${cc}' + println('compile_cmd: ${compile_cmd}') + compile_result := os.execute(compile_cmd) + if compile_result.exit_code != 0 { + panic('compile fail with "${compile_cmd}"') + } + + // 2. run + for s in run_settings { + run_cmd := './channel_bench_v ${s[0]:-3} ${s[1]:-3} ${s[2]:-3} ${nobj}' + println('-----------------------------------------------------------') + mut iteration_result := []f32{} + for i in 0 .. run_iterations { + print('${i:3}: ${run_cmd}') + run_result := os.execute(run_cmd) + f := get_perf_from_result(run_result.output)! + iteration_result << f + println(' => ${f:.2} objs/µs') + } + avg := arrays.sum(iteration_result)! / run_iterations + perf_result << avg + } + } + + // 3. output result + mut sb := strings.new_builder(8192) + sb.write_string('\n| nsend | nrec | buflen |') + for cc in compilers { + sb.write_string(' **V (${cc:-5})** |') + } + sb.writeln('') + sb.write_string('| :---: | :---:| :---: |') + for _ in 0 .. compilers.len { + sb.write_string(' :---: |') + } + sb.writeln('') + for i, s in run_settings { + sb.write_string('| ${s[0]:-3} | ${s[1]:-3} | ${s[2]:-3} |') + for j in 0 .. compilers.len { + sb.write_string(' ${perf_result[j * run_settings.len + i]:-5.2} |') + } + sb.writeln('') + } + sb.writeln('') + println('***********************************************************') + println('writing result to `result.md`...') + println('***********************************************************') + os.write_file('result.md', sb.str())! + println(sb.str()) + os.rm('./channel_bench_v')! +} diff --git a/vlib/sync/channels.c.v b/vlib/sync/channels.c.v index f25a3e6d8f..29a1754a93 100644 --- a/vlib/sync/channels.c.v +++ b/vlib/sync/channels.c.v @@ -50,8 +50,8 @@ mut: // for select write_subscriber &Subscription = unsafe { nil } read_subscriber &Subscription = unsafe { nil } - write_sub_mtx u16 - read_sub_mtx u16 + write_sub_mtx &SpinLock + read_sub_mtx &SpinLock closed u16 pub: cap u32 // queue length in #objects @@ -80,6 +80,8 @@ fn new_channel_st(n u32, st u32) &Channel { statusbuf: sbuf write_subscriber: unsafe { nil } read_subscriber: unsafe { nil } + write_sub_mtx: new_spin_lock() + read_sub_mtx: new_spin_lock() } ch.writesem.init(wsem) ch.readsem.init(rsem) @@ -103,6 +105,8 @@ fn new_channel_st_noscan(n u32, st u32) &Channel { statusbuf: sbuf write_subscriber: unsafe { nil } read_subscriber: unsafe { nil } + write_sub_mtx: new_spin_lock() + read_sub_mtx: new_spin_lock() } ch.writesem.init(wsem) ch.readsem.init(rsem) @@ -130,27 +134,24 @@ pub fn (mut ch Channel) close() { } ch.readsem_im.post() ch.readsem.post() - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } + ch.read_sub_mtx.lock() if ch.read_subscriber != unsafe { nil } { ch.read_subscriber.sem.post() } - C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) - null16 = u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } + ch.read_sub_mtx.unlock() + ch.write_sub_mtx.lock() if ch.write_subscriber != unsafe { nil } { ch.write_subscriber.sem.post() } - C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + ch.write_sub_mtx.unlock() ch.writesem.post() if ch.cap == 0 { C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, unsafe { nil }) } ch.writesem_im.post() + + // Do not destroy `read_sub_mtx` and `write_sub_mtx` here, + // because we can read from a closed channel later. } @[inline] @@ -236,15 +237,11 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState { } } if !read_in_progress { - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(voidptr(&ch.read_sub_mtx), &null16, - u16(1)) { - null16 = u16(0) - } + ch.read_sub_mtx.lock() if ch.read_subscriber != unsafe { nil } { ch.read_subscriber.sem.post() } - C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + ch.read_sub_mtx.unlock() } mut src2 := src for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ { @@ -335,14 +332,11 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState { C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written)) C.atomic_fetch_add_u32(voidptr(&ch.read_avail), 1) ch.readsem.post() - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } + ch.read_sub_mtx.lock() if ch.read_subscriber != unsafe { nil } { ch.read_subscriber.sem.post() } - C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + ch.read_sub_mtx.unlock() return .success } else { if no_block { @@ -456,14 +450,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState { C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused)) C.atomic_fetch_add_u32(voidptr(&ch.write_free), 1) ch.writesem.post() - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } + ch.write_sub_mtx.lock() if ch.write_subscriber != unsafe { nil } { ch.write_subscriber.sem.post() } - C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + ch.write_sub_mtx.unlock() return .success } } @@ -484,14 +475,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState { } } if ch.cap == 0 && !write_in_progress { - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } + ch.write_sub_mtx.lock() if ch.write_subscriber != unsafe { nil } { ch.write_subscriber.sem.post() } - C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + ch.write_sub_mtx.unlock() } mut dest2 := dest for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ { @@ -556,14 +544,11 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo for i, ch in channels { subscr[i].sem = unsafe { &sem } sub_mtx, subscriber := if dir[i] == .push { - &ch.write_sub_mtx, &ch.write_subscriber + ch.write_sub_mtx, &ch.write_subscriber } else { - &ch.read_sub_mtx, &ch.read_subscriber - } - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) { - null16 = u16(0) + ch.read_sub_mtx, &ch.read_subscriber } + sub_mtx.lock() subscr[i].prev = unsafe { subscriber } unsafe { subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(subscriber), @@ -572,7 +557,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo if voidptr(subscr[i].nxt) != unsafe { nil } { subscr[i].nxt.prev = unsafe { &subscr[i].nxt } } - C.atomic_store_u16(sub_mtx, u16(0)) + sub_mtx.unlock() } stopwatch := if timeout == time.infinite || timeout <= 0 { time.StopWatch{} @@ -620,14 +605,11 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo // reset subscribers for i, ch in channels { sub_mtx := if dir[i] == .push { - &ch.write_sub_mtx + ch.write_sub_mtx } else { - &ch.read_sub_mtx - } - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) { - null16 = u16(0) + ch.read_sub_mtx } + sub_mtx.lock() unsafe { *subscr[i].prev = subscr[i].nxt } @@ -635,7 +617,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo subscr[i].nxt.prev = subscr[i].prev subscr[i].nxt.sem.post() } - C.atomic_store_u16(sub_mtx, u16(0)) + sub_mtx.unlock() } sem.destroy() return event_idx