mirror of
https://github.com/vlang/v.git
synced 2025-08-04 02:07:28 -04:00
encoding.csv: add support for multithreading to encoding.csv.RandomAccessReader
(#23677)
This commit is contained in:
parent
f3493e126a
commit
089778e55e
@ -43,6 +43,7 @@ pub mut:
|
|||||||
htype ColumType = .string
|
htype ColumType = .string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@[heap]
|
||||||
pub struct RandomAccessReader {
|
pub struct RandomAccessReader {
|
||||||
pub mut:
|
pub mut:
|
||||||
index i64
|
index i64
|
||||||
@ -71,6 +72,7 @@ pub mut:
|
|||||||
mem_buf_start i64 = -1 // start index in the file of the read buffer
|
mem_buf_start i64 = -1 // start index in the file of the read buffer
|
||||||
mem_buf_end i64 = -1 // end index in the file of the read buffer
|
mem_buf_end i64 = -1 // end index in the file of the read buffer
|
||||||
// csv map for quick access
|
// csv map for quick access
|
||||||
|
create_map_csv bool = true // flag to enable the csv map creation
|
||||||
csv_map [][]i64
|
csv_map [][]i64
|
||||||
// header
|
// header
|
||||||
header_row int = -1 // row index of the header in the csv_map
|
header_row int = -1 // row index of the header in the csv_map
|
||||||
@ -94,6 +96,7 @@ pub:
|
|||||||
end_line_len int = endline_cr_len // size of the endline rune
|
end_line_len int = endline_cr_len // size of the endline rune
|
||||||
quote u8 = `"` // double quote is the standard quote char
|
quote u8 = `"` // double quote is the standard quote char
|
||||||
quote_remove bool // if true clear the cell from the quotes
|
quote_remove bool // if true clear the cell from the quotes
|
||||||
|
create_map_csv bool = true // if true make the map of the csv file
|
||||||
}
|
}
|
||||||
|
|
||||||
/******************************************************************************
|
/******************************************************************************
|
||||||
@ -177,7 +180,10 @@ pub fn csv_reader(cfg RandomAccessReaderConfig) !&RandomAccessReader {
|
|||||||
cr.quote_remove = cfg.quote_remove
|
cr.quote_remove = cfg.quote_remove
|
||||||
cr.quote = cfg.quote
|
cr.quote = cfg.quote
|
||||||
|
|
||||||
|
cr.create_map_csv = cfg.create_map_csv
|
||||||
|
if cr.create_map_csv {
|
||||||
cr.map_csv()!
|
cr.map_csv()!
|
||||||
|
}
|
||||||
|
|
||||||
return cr
|
return cr
|
||||||
}
|
}
|
||||||
@ -226,6 +232,18 @@ fn (mut cr RandomAccessReader) fill_buffer(i i64) !i64 {
|
|||||||
return i64(-1)
|
return i64(-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// copy_configuration copies the configuration from another csv RandomAccessReader
|
||||||
|
// this function is a helper for using the RandomAccessReader in multi threaded applications
|
||||||
|
// pay attention to the free process
|
||||||
|
pub fn (mut cr RandomAccessReader) copy_configuration(src_cr RandomAccessReader) {
|
||||||
|
cr.header_row = src_cr.header_row
|
||||||
|
unsafe {
|
||||||
|
cr.header_list = &src_cr.header_list
|
||||||
|
cr.header_map = &src_cr.header_map
|
||||||
|
cr.csv_map = &src_cr.csv_map
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/******************************************************************************
|
/******************************************************************************
|
||||||
*
|
*
|
||||||
* Csv mapper, mapped reader
|
* Csv mapper, mapped reader
|
||||||
@ -248,8 +266,15 @@ pub fn (mut cr RandomAccessReader) map_csv() ! {
|
|||||||
p := &u8(cr.mem_buf)
|
p := &u8(cr.mem_buf)
|
||||||
cr.csv_map << []i64{}
|
cr.csv_map << []i64{}
|
||||||
cr.csv_map[0] << if cr.is_bom_present { 3 } else { 0 } // skip the BOM data
|
cr.csv_map[0] << if cr.is_bom_present { 3 } else { 0 } // skip the BOM data
|
||||||
|
|
||||||
|
// mut counter := i64(0)
|
||||||
for i < cr.end_index {
|
for i < cr.end_index {
|
||||||
read_bytes_count := cr.fill_buffer(i)!
|
read_bytes_count := cr.fill_buffer(i)!
|
||||||
|
|
||||||
|
// DEBUG print
|
||||||
|
// perc := f32(counter) / f32(cr.end_index) * 100.0
|
||||||
|
// println("${perc:.2f}")
|
||||||
|
|
||||||
// println("${i:-12d} of ${cr.f_len:-12d} readed: ${read_bytes_count}")
|
// println("${i:-12d} of ${cr.f_len:-12d} readed: ${read_bytes_count}")
|
||||||
mut p1 := p
|
mut p1 := p
|
||||||
mut i1 := i64(0)
|
mut i1 := i64(0)
|
||||||
@ -317,6 +342,7 @@ pub fn (mut cr RandomAccessReader) map_csv() ! {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
i += read_bytes_count
|
i += read_bytes_count
|
||||||
|
// counter += i1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// remove last row if it is not a valid one
|
// remove last row if it is not a valid one
|
||||||
@ -428,14 +454,18 @@ pub fn (mut cr RandomAccessReader) get_cellt(cfg GetCellConfig) !CellValue {
|
|||||||
if cr.header_row >= 0 && cfg.x < cr.header_list.len {
|
if cr.header_row >= 0 && cfg.x < cr.header_list.len {
|
||||||
h := cr.header_list[cfg.x]
|
h := cr.header_list[cfg.x]
|
||||||
res := cr.get_cell(cfg)!
|
res := cr.get_cell(cfg)!
|
||||||
if h.htype == .int {
|
match h.htype {
|
||||||
return res.int()
|
.int {
|
||||||
}
|
return res.trim_space().int()
|
||||||
if h.htype == .f32 {
|
|
||||||
return res.f32()
|
|
||||||
}
|
}
|
||||||
|
.string {
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
.f32 {
|
||||||
|
return res.trim_space().f32()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return cr.get_cell(cfg)!
|
return cr.get_cell(cfg)!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,6 +12,7 @@ Known limitations:
|
|||||||
import encoding.csv
|
import encoding.csv
|
||||||
import strings
|
import strings
|
||||||
import os
|
import os
|
||||||
|
import rand
|
||||||
|
|
||||||
/******************************************************************************
|
/******************************************************************************
|
||||||
*
|
*
|
||||||
@ -347,3 +348,125 @@ fn test_coherence() {
|
|||||||
fn main() {
|
fn main() {
|
||||||
test_csv_string()
|
test_csv_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Multithreaded tests
|
||||||
|
|
||||||
|
fn create_csv(file_path string, size int) !i64 {
|
||||||
|
// create csv file for the test
|
||||||
|
mut csv_txt := 'pippo,count,count1,pera,sempronio,float'
|
||||||
|
|
||||||
|
mut f := os.open_file(file_path, 'w')!
|
||||||
|
f.write_string(csv_txt + '\n')!
|
||||||
|
mut count := i64(0)
|
||||||
|
for i in 0 .. size {
|
||||||
|
tmp := "${rand.int()}, ${i}, 3, \"txt1${i}\", \"txt2${i}\", ${f32(rand.u32()) / 1000.0}\n"
|
||||||
|
f.write_string(tmp)!
|
||||||
|
// if i % 1_000_000 == 0 {
|
||||||
|
// println(i)
|
||||||
|
// }
|
||||||
|
count += i
|
||||||
|
}
|
||||||
|
f.close()
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_lines(id int, csvr csv.RandomAccessReader, mut data [][]csv.CellValue, start_row int, end_row int) {
|
||||||
|
// println(" func ${data.len},${data[1].len}")
|
||||||
|
unsafe {
|
||||||
|
for count, col_elem in csvr.header_list {
|
||||||
|
// println("Check: ${col_elem}")
|
||||||
|
match col_elem.htype {
|
||||||
|
.string {
|
||||||
|
// println('id:${id} String here')
|
||||||
|
for row_index in start_row .. end_row {
|
||||||
|
// println("str ${count},${row_index}")
|
||||||
|
data[count][row_index - 1] = csvr.get_cell(x: count, y: row_index) or {
|
||||||
|
panic('Str get_cell failed')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.int {
|
||||||
|
// println('id:${id} Int here')
|
||||||
|
for row_index in start_row .. end_row {
|
||||||
|
// println("int ${count},${row_index}")
|
||||||
|
data[count][row_index - 1] = csvr.get_cell(x: count, y: row_index) or {
|
||||||
|
panic('Int get_cell failed')
|
||||||
|
}.trim_space().int()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.f32 {
|
||||||
|
// println('id:${id} f32 here')
|
||||||
|
for row_index in start_row .. end_row {
|
||||||
|
// println("f32 ${count},${row_index}")
|
||||||
|
data[count][row_index - 1] = csvr.get_cell(x: count, y: row_index) or {
|
||||||
|
panic('F32 get_cell failed')
|
||||||
|
}.trim_space().f32()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // unsafe
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_multithreading() {
|
||||||
|
file_path_str := os.join_path(os.temp_dir(), 'test_csv.csv')
|
||||||
|
size := 10_000
|
||||||
|
|
||||||
|
// create the test file
|
||||||
|
res_count := create_csv(file_path_str, size)!
|
||||||
|
|
||||||
|
slices := 2 // number of slice of the csv
|
||||||
|
mem_buf_size := 1024 * 1024 * 1
|
||||||
|
|
||||||
|
mut csvr := []csv.RandomAccessReader{}
|
||||||
|
|
||||||
|
// init first csv reader
|
||||||
|
csvr << csv.csv_reader(file_path: file_path_str, mem_buf_size: mem_buf_size)!
|
||||||
|
csvr[0].build_header_dict(csv.GetHeaderConf{})!
|
||||||
|
|
||||||
|
// init other csv readers using the first reader configuration
|
||||||
|
for _ in 1 .. slices {
|
||||||
|
mut tmp_csvr := csv.csv_reader(
|
||||||
|
file_path: file_path_str
|
||||||
|
mem_buf_size: mem_buf_size
|
||||||
|
create_map_csv: false
|
||||||
|
)!
|
||||||
|
tmp_csvr.copy_configuration(csvr[0])
|
||||||
|
csvr << tmp_csvr
|
||||||
|
}
|
||||||
|
|
||||||
|
// read the data from the csv file
|
||||||
|
mut data := [][]csv.CellValue{}
|
||||||
|
|
||||||
|
n_rows := csvr[0].csv_map.len
|
||||||
|
unsafe {
|
||||||
|
data = [][]csv.CellValue{len: csvr[0].header_list.len, init: []csv.CellValue{len: n_rows}}
|
||||||
|
}
|
||||||
|
step := n_rows / slices
|
||||||
|
mut start := 1
|
||||||
|
mut end := if (start + step) > n_rows { n_rows } else { start + step }
|
||||||
|
|
||||||
|
mut threads := []thread{}
|
||||||
|
for task_index in 0 .. slices {
|
||||||
|
threads << spawn read_lines(task_index, csvr[task_index], mut &data, start, end)
|
||||||
|
start = end
|
||||||
|
end = if (start + step) > n_rows { n_rows } else { start + step }
|
||||||
|
}
|
||||||
|
threads.wait()
|
||||||
|
|
||||||
|
// release the csv readers
|
||||||
|
for mut item in csvr {
|
||||||
|
item.dispose_csv_reader()
|
||||||
|
}
|
||||||
|
|
||||||
|
// check for the integer column sum
|
||||||
|
mut ck_count := i64(0)
|
||||||
|
for i in 0 .. csvr[0].csv_map.len - 1 {
|
||||||
|
ck_count += data[1][i] as int
|
||||||
|
}
|
||||||
|
|
||||||
|
assert ck_count == res_count, 'check on csv file failed!'
|
||||||
|
|
||||||
|
// remove the temp file
|
||||||
|
os.rm(file_path_str)!
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user