diff --git a/vlib/encoding/csv/csv_reader_random_access.v b/vlib/encoding/csv/csv_reader_random_access.v index 97f953b961..cda3a8d7cd 100644 --- a/vlib/encoding/csv/csv_reader_random_access.v +++ b/vlib/encoding/csv/csv_reader_random_access.v @@ -43,6 +43,7 @@ pub mut: htype ColumType = .string } +@[heap] pub struct RandomAccessReader { pub mut: index i64 @@ -71,7 +72,8 @@ pub mut: mem_buf_start i64 = -1 // start index in the file of the read buffer mem_buf_end i64 = -1 // end index in the file of the read buffer // csv map for quick access - csv_map [][]i64 + create_map_csv bool = true // flag to enable the csv map creation + csv_map [][]i64 // header header_row int = -1 // row index of the header in the csv_map header_list []HeaderItem // list of the header item @@ -81,19 +83,20 @@ pub mut: @[params] pub struct RandomAccessReaderConfig { pub: - scr_buf voidptr // pointer to the buffer of data - scr_buf_len i64 // if > 0 use the RAM pointed from scr_buf as source of data - file_path string - start_index i64 - end_index i64 = -1 - mem_buf_size int = 1024 * 64 // default buffer size 64KByte - separator u8 = `,` - comment u8 = `#` // every line that start with the quote char is ignored - default_cell string = '*' // return this string if out of the csv boundaries - empty_cell string // return this string if empty cell - end_line_len int = endline_cr_len // size of the endline rune - quote u8 = `"` // double quote is the standard quote char - quote_remove bool // if true clear the cell from the quotes + scr_buf voidptr // pointer to the buffer of data + scr_buf_len i64 // if > 0 use the RAM pointed from scr_buf as source of data + file_path string + start_index i64 + end_index i64 = -1 + mem_buf_size int = 1024 * 64 // default buffer size 64KByte + separator u8 = `,` + comment u8 = `#` // every line that start with the quote char is ignored + default_cell string = '*' // return this string if out of the csv boundaries + empty_cell string // return this string if empty cell + end_line_len int = endline_cr_len // size of the endline rune + quote u8 = `"` // double quote is the standard quote char + quote_remove bool // if true clear the cell from the quotes + create_map_csv bool = true // if true make the map of the csv file } /****************************************************************************** @@ -177,7 +180,10 @@ pub fn csv_reader(cfg RandomAccessReaderConfig) !&RandomAccessReader { cr.quote_remove = cfg.quote_remove cr.quote = cfg.quote - cr.map_csv()! + cr.create_map_csv = cfg.create_map_csv + if cr.create_map_csv { + cr.map_csv()! + } return cr } @@ -226,6 +232,18 @@ fn (mut cr RandomAccessReader) fill_buffer(i i64) !i64 { return i64(-1) } +// copy_configuration copies the configuration from another csv RandomAccessReader +// this function is a helper for using the RandomAccessReader in multi threaded applications +// pay attention to the free process +pub fn (mut cr RandomAccessReader) copy_configuration(src_cr RandomAccessReader) { + cr.header_row = src_cr.header_row + unsafe { + cr.header_list = &src_cr.header_list + cr.header_map = &src_cr.header_map + cr.csv_map = &src_cr.csv_map + } +} + /****************************************************************************** * * Csv mapper, mapped reader @@ -248,8 +266,15 @@ pub fn (mut cr RandomAccessReader) map_csv() ! { p := &u8(cr.mem_buf) cr.csv_map << []i64{} cr.csv_map[0] << if cr.is_bom_present { 3 } else { 0 } // skip the BOM data + + // mut counter := i64(0) for i < cr.end_index { read_bytes_count := cr.fill_buffer(i)! + + // DEBUG print + // perc := f32(counter) / f32(cr.end_index) * 100.0 + // println("${perc:.2f}") + // println("${i:-12d} of ${cr.f_len:-12d} readed: ${read_bytes_count}") mut p1 := p mut i1 := i64(0) @@ -317,6 +342,7 @@ pub fn (mut cr RandomAccessReader) map_csv() ! { } } i += read_bytes_count + // counter += i1 } } // remove last row if it is not a valid one @@ -428,13 +454,17 @@ pub fn (mut cr RandomAccessReader) get_cellt(cfg GetCellConfig) !CellValue { if cr.header_row >= 0 && cfg.x < cr.header_list.len { h := cr.header_list[cfg.x] res := cr.get_cell(cfg)! - if h.htype == .int { - return res.int() + match h.htype { + .int { + return res.trim_space().int() + } + .string { + return res + } + .f32 { + return res.trim_space().f32() + } } - if h.htype == .f32 { - return res.f32() - } - return res } return cr.get_cell(cfg)! } diff --git a/vlib/encoding/csv/csv_reader_test.v b/vlib/encoding/csv/csv_reader_test.v index 3d599c08fc..27f129ce98 100644 --- a/vlib/encoding/csv/csv_reader_test.v +++ b/vlib/encoding/csv/csv_reader_test.v @@ -12,6 +12,7 @@ Known limitations: import encoding.csv import strings import os +import rand /****************************************************************************** * @@ -347,3 +348,125 @@ fn test_coherence() { fn main() { test_csv_string() } + +// Multithreaded tests + +fn create_csv(file_path string, size int) !i64 { + // create csv file for the test + mut csv_txt := 'pippo,count,count1,pera,sempronio,float' + + mut f := os.open_file(file_path, 'w')! + f.write_string(csv_txt + '\n')! + mut count := i64(0) + for i in 0 .. size { + tmp := "${rand.int()}, ${i}, 3, \"txt1${i}\", \"txt2${i}\", ${f32(rand.u32()) / 1000.0}\n" + f.write_string(tmp)! + // if i % 1_000_000 == 0 { + // println(i) + // } + count += i + } + f.close() + return count +} + +fn read_lines(id int, csvr csv.RandomAccessReader, mut data [][]csv.CellValue, start_row int, end_row int) { + // println(" func ${data.len},${data[1].len}") + unsafe { + for count, col_elem in csvr.header_list { + // println("Check: ${col_elem}") + match col_elem.htype { + .string { + // println('id:${id} String here') + for row_index in start_row .. end_row { + // println("str ${count},${row_index}") + data[count][row_index - 1] = csvr.get_cell(x: count, y: row_index) or { + panic('Str get_cell failed') + } + } + } + .int { + // println('id:${id} Int here') + for row_index in start_row .. end_row { + // println("int ${count},${row_index}") + data[count][row_index - 1] = csvr.get_cell(x: count, y: row_index) or { + panic('Int get_cell failed') + }.trim_space().int() + } + } + .f32 { + // println('id:${id} f32 here') + for row_index in start_row .. end_row { + // println("f32 ${count},${row_index}") + data[count][row_index - 1] = csvr.get_cell(x: count, y: row_index) or { + panic('F32 get_cell failed') + }.trim_space().f32() + } + } + } + } + } // unsafe +} + +fn test_multithreading() { + file_path_str := os.join_path(os.temp_dir(), 'test_csv.csv') + size := 10_000 + + // create the test file + res_count := create_csv(file_path_str, size)! + + slices := 2 // number of slice of the csv + mem_buf_size := 1024 * 1024 * 1 + + mut csvr := []csv.RandomAccessReader{} + + // init first csv reader + csvr << csv.csv_reader(file_path: file_path_str, mem_buf_size: mem_buf_size)! + csvr[0].build_header_dict(csv.GetHeaderConf{})! + + // init other csv readers using the first reader configuration + for _ in 1 .. slices { + mut tmp_csvr := csv.csv_reader( + file_path: file_path_str + mem_buf_size: mem_buf_size + create_map_csv: false + )! + tmp_csvr.copy_configuration(csvr[0]) + csvr << tmp_csvr + } + + // read the data from the csv file + mut data := [][]csv.CellValue{} + + n_rows := csvr[0].csv_map.len + unsafe { + data = [][]csv.CellValue{len: csvr[0].header_list.len, init: []csv.CellValue{len: n_rows}} + } + step := n_rows / slices + mut start := 1 + mut end := if (start + step) > n_rows { n_rows } else { start + step } + + mut threads := []thread{} + for task_index in 0 .. slices { + threads << spawn read_lines(task_index, csvr[task_index], mut &data, start, end) + start = end + end = if (start + step) > n_rows { n_rows } else { start + step } + } + threads.wait() + + // release the csv readers + for mut item in csvr { + item.dispose_csv_reader() + } + + // check for the integer column sum + mut ck_count := i64(0) + for i in 0 .. csvr[0].csv_map.len - 1 { + ck_count += data[1][i] as int + } + + assert ck_count == res_count, 'check on csv file failed!' + + // remove the temp file + os.rm(file_path_str)! +}