diff --git a/cmd/tools/vtest-self.v b/cmd/tools/vtest-self.v index f0c6778f1e..01421369cd 100644 --- a/cmd/tools/vtest-self.v +++ b/cmd/tools/vtest-self.v @@ -230,9 +230,11 @@ const skip_on_musl = [ 'vlib/gg/draw_fns_api_test.v', 'vlib/v/tests/skip_unused/gg_code.vv', 'vlib/v/tests/c_struct_with_reserved_field_name_test.v', + 'vlib/arrays/parallel/parallel_test.v', ] const skip_on_ubuntu_musl = [ 'do_not_remove', + 'vlib/arrays/parallel/parallel_test.v', //'vlib/v/gen/js/jsgen_test.v', 'vlib/net/http/cookie_test.v', 'vlib/net/http/status_test.v', diff --git a/vlib/arrays/parallel/parallel.v b/vlib/arrays/parallel/parallel.v new file mode 100644 index 0000000000..3e6ed63d00 --- /dev/null +++ b/vlib/arrays/parallel/parallel.v @@ -0,0 +1,111 @@ +module parallel + +import sync +import runtime + +// Params contains the optional parameters that can be passed to `run` and `amap`. +@[params] +pub struct Params { +pub mut: + workers int // 0 by default, so that VJOBS will be used, through runtime.nr_jobs() +} + +fn limited_workers(max_workers int, ilen int) int { + // create a limited amount of workers to handle the load + workers := if max_workers != 0 { max_workers } else { runtime.nr_jobs() } + if ilen < workers { + return ilen + } + return workers +} + +// run lets the user run an array of input with a +// user provided function in parallel. It limits the number of +// worker threads to min(num_workers, num_cpu) +// The function aborts if an error is encountered. +// Example: parallel.run([1, 2, 3, 4, 5], 2, fn (i) { println(i) }) +pub fn run[T](input []T, worker fn (T), opt Params) { + if input.len == 0 { + return + } + workers := limited_workers(opt.workers, input.len) + ch := chan T{cap: workers * 2} + mut wg := sync.new_waitgroup() + wg.add(input.len) + for _ in 0 .. workers { + spawn fn [ch, worker, mut wg] [T]() { + for { + task := <-ch or { break } + worker(task) + wg.done() + } + }() + } + + // put the input into the channel + for i in input { + ch <- i + } + + // wait for all tasks to complete + wg.wait() + ch.close() // this will signal all the workers to exit, and we can return, without having to wait for them to finish +} + +struct Task[T, R] { + idx int + input T + result R +} + +// amap lets the user run an array of input with a +// user provided function in parallel. It limits the number of +// worker threads to max number of cpus. +// The worker function can return a value. The returning array maintains the input order. +// Any error handling should have happened within the worker function. +// Example: squares := parallel.amap([1, 2, 3, 4, 5], 2, fn (i) { return i * i }) +pub fn amap[T, R](input []T, worker fn (T) R, opt Params) []R { + if input.len == 0 { + return [] + } + mut tasks := []Task[T, R]{len: input.len} + // the tasks array will be passed to the closure of each worker by reference, so that it could + // then modify the same tasks: + mut tasks_ref := &tasks + + workers := limited_workers(opt.workers, input.len) + // use a buffered channel for transfering the tasks, that has enough space to keep all the workers busy, + // without blocking the main thread needlessly + ch := chan Task[T, R]{cap: workers * 2} + mut wg := sync.new_waitgroup() + wg.add(input.len) + for _ in 0 .. workers { + spawn fn [ch, worker, mut wg, mut tasks_ref] [T, R]() { + for { + mut task := <-ch or { break } + unsafe { + tasks_ref[task.idx] = Task[T, R]{ + idx: task.idx + input: task.input + result: worker(task.input) + } + } + wg.done() + } + }() + } + + // put the input into the channel + for idx, inp in input { + ch <- Task[T, R]{ + idx: idx + input: inp + } + } + + // wait for all tasks to complete + wg.wait() + ch.close() + tasks.sort(a.idx < b.idx) + return tasks.map(it.result) +} diff --git a/vlib/arrays/parallel/parallel_test.v b/vlib/arrays/parallel/parallel_test.v new file mode 100644 index 0000000000..a06637a76c --- /dev/null +++ b/vlib/arrays/parallel/parallel_test.v @@ -0,0 +1,66 @@ +import arrays.parallel +import rand +import time + +fn test_parallel_run_with_empty_arrays() { + parallel.run([]int{}, fn (x int) {}) + parallel.run([]u8{}, fn (x u8) {}) + parallel.run([]u32{}, fn (x u32) {}, workers: 1000) + assert true +} + +fn test_parallel_amap_with_empty_arrays() { + assert parallel.amap([]int{}, fn (x int) u8 { + return 0 + }) == [] + assert parallel.amap([]u8{}, fn (x u8) int { + return 0 + }) == [] + assert parallel.amap([]u8{}, fn (x u8) int { + return 0 + }, workers: 1000) == [] + assert true +} + +fn test_parallel_run() { + counters := []int{len: 10, init: index} + dump(counters) + mut res := []string{len: 10} + mut pres := &res + parallel.run(counters, fn [mut pres] (i int) { + delay := rand.intn(250) or { 250 } + time.sleep(delay * time.millisecond) + unsafe { + pres[i] = 'task ${i}, delay=${delay}ms' + } + assert true + }) + dump(res) + assert res.len == counters.len +} + +fn test_parallel_amap() { + input := [1, 2, 3, 4, 5, 6, 7, 8, 9] + dump(input) + dump(input.len) + output := parallel.amap(input, fn (i int) int { + delay := rand.intn(250) or { 250 } + time.sleep(delay * time.millisecond) + return i * i + }) + dump(output) + dump(output.len) + assert input.len == output.len + + for i, _ in output { + assert output[i] == input[i] * input[i] + } + + // unordered output validation + assert output.len == input.len + op_sorted := output.sorted() + dump(op_sorted) + for i, op in op_sorted { + assert op == input[i] * input[i] + } +}