mirror of
https://github.com/vlang/v.git
synced 2025-08-03 09:47:15 -04:00
db: add redis (#24730)
This commit is contained in:
parent
9fb205a61c
commit
ef279f67a9
149
vlib/db/redis/README.md
Normal file
149
vlib/db/redis/README.md
Normal file
@ -0,0 +1,149 @@
|
||||
# Redis Client for V
|
||||
|
||||
This module provides a Redis client implementation in V that supports the
|
||||
Redis Serialization Protocol (RESP) with a type-safe interface for common Redis commands.
|
||||
|
||||
## Features
|
||||
|
||||
- **Type-Safe Commands**: Auto-detects value types at compile time
|
||||
- **Pipeline Support**: Group commands for batch execution
|
||||
- **Connection Pooling**: Efficient resource management
|
||||
- **RESP Protocol Support**: Full Redis Serialization Protocol implementation
|
||||
- **Memory Efficient**: Pre-allocated buffers for minimal allocations
|
||||
|
||||
## Quick Start
|
||||
|
||||
```v
|
||||
module main
|
||||
|
||||
import db.redis
|
||||
|
||||
fn main() {
|
||||
// Connect to Redis
|
||||
mut db := redis.connect(redis.Config{
|
||||
host: 'localhost'
|
||||
port: 6379
|
||||
})!
|
||||
|
||||
// Set and get values
|
||||
db.set('name', 'Alice')!
|
||||
name := db.get[string]('name')!
|
||||
println('Name: ${name}') // Output: Name: Alice
|
||||
|
||||
// Integer operations
|
||||
db.set('counter', 42)!
|
||||
db.incr('counter')!
|
||||
counter := db.get[int]('counter')!
|
||||
println('Counter: ${counter}') // Output: Counter: 43
|
||||
|
||||
// Clean up
|
||||
db.close()!
|
||||
}
|
||||
```
|
||||
|
||||
## Supported Commands
|
||||
|
||||
### Key Operations
|
||||
|
||||
```v ignore
|
||||
// Set value
|
||||
db.set('key', 'value')!
|
||||
db.set('number', 42)!
|
||||
db.set('binary', []u8{len: 4, init: 0})!
|
||||
|
||||
// Get value
|
||||
str_value := db.get[string]('key')!
|
||||
int_value := db.get[int]('number')!
|
||||
bin_value := db.get[[]u8]('binary')!
|
||||
|
||||
// Delete key
|
||||
db.del('key')!
|
||||
|
||||
// Set expiration
|
||||
db.expire('key', 60)! // 60 seconds
|
||||
```
|
||||
|
||||
### Hash Operations
|
||||
|
||||
```v ignore
|
||||
// Set hash fields
|
||||
db.hset('user:1', {
|
||||
'name': 'Bob',
|
||||
'age': 30,
|
||||
})!
|
||||
|
||||
// Get single field
|
||||
name := db.hget[string]('user:1', 'name')!
|
||||
|
||||
// Get all fields
|
||||
user_data := db.hgetall[string]('user:1')!
|
||||
println(user_data) // Output: {'name': 'Bob', 'age': '30'}
|
||||
```
|
||||
|
||||
### Pipeline Operations
|
||||
|
||||
```v ignore
|
||||
// Start pipeline
|
||||
db.pipeline_start()
|
||||
|
||||
// Queue commands
|
||||
db.incr('counter')
|
||||
db.set('name', 'Charlie')
|
||||
db.get[string]('name')
|
||||
|
||||
// Execute and get responses
|
||||
responses := db.pipeline_execute()!
|
||||
for resp in responses {
|
||||
println(resp)
|
||||
}
|
||||
```
|
||||
|
||||
### Custom Commands
|
||||
|
||||
```v ignore
|
||||
// Run raw commands
|
||||
resp := db.cmd('SET', 'custom', 'value')!
|
||||
result := db.cmd('GET', 'custom')!
|
||||
|
||||
// Complex commands
|
||||
db.cmd('HSET', 'user:2', 'field1', 'value1', 'field2', '42')!
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
All functions return `!` types and will return detailed errors for:
|
||||
|
||||
- Connection issues
|
||||
- Protocol violations
|
||||
- Type mismatches
|
||||
- Redis error responses
|
||||
- Timeout conditions
|
||||
|
||||
```v ignore
|
||||
result := db.get[string]('nonexistent') or {
|
||||
println('Key not found')
|
||||
return
|
||||
}
|
||||
```
|
||||
|
||||
## Connection Management
|
||||
|
||||
```v ignore
|
||||
config := redis.Config{
|
||||
host: 'redis.server'
|
||||
port: 6379
|
||||
version: 2 // RESP2 protocol
|
||||
}
|
||||
|
||||
mut db := redis.connect(config)!
|
||||
defer {
|
||||
db.close() or { eprintln('Error closing connection: $err') }
|
||||
}
|
||||
```
|
||||
|
||||
## Performance Tips
|
||||
|
||||
1. **Reuse Connections**: Maintain connections instead of creating new ones
|
||||
2. **Use Pipelines**: Batch commands for high-throughput operations
|
||||
3. **Prefer Integers**: Use numeric types for counters and metrics
|
||||
4. **Specify Types**: Always specify return types for get operations
|
549
vlib/db/redis/redis.v
Normal file
549
vlib/db/redis/redis.v
Normal file
@ -0,0 +1,549 @@
|
||||
// https://redis.io/docs/latest/develop/reference/protocol-spec/
|
||||
|
||||
module redis
|
||||
|
||||
import net
|
||||
import strings
|
||||
|
||||
// RedisValue represents all possible RESP (Redis Serialization Protocol) data types
|
||||
pub type RedisValue = string | i64 | bool | f32 | f64 | []u8 | RedisNull | []RedisValue
|
||||
|
||||
// RedisNull represents the Redis NULL type
|
||||
pub struct RedisNull {}
|
||||
|
||||
const cmd_buf_pre_allocate_len = 4096 // Initial buffer size for command building
|
||||
const resp_buf_pre_allocate_len = 8192 // Initial buffer size for response reading
|
||||
|
||||
// DB represents a Redis database connection
|
||||
pub struct DB {
|
||||
pub mut:
|
||||
version int // RESP protocol version
|
||||
mut:
|
||||
conn &net.TcpConn = unsafe { nil } // TCP connection to Redis
|
||||
|
||||
// Pre-allocated buffers to reduce memory allocations
|
||||
cmd_buf []u8 // Buffer for building commands
|
||||
resp_buf []u8 // Buffer for reading responses
|
||||
pipeline_mode bool
|
||||
pipeline_buffer []u8
|
||||
pipeline_cmd_count int
|
||||
}
|
||||
|
||||
// Configuration options for Redis connection
|
||||
@[params]
|
||||
pub struct Config {
|
||||
pub mut:
|
||||
host string = '127.0.0.1' // Redis server host
|
||||
port u16 = 6379 // Redis server port
|
||||
version int = 2 // RESP protocol version (default: v2)
|
||||
}
|
||||
|
||||
// connect establishes a connection to a Redis server
|
||||
pub fn connect(config Config) !DB {
|
||||
conn := net.dial_tcp('${config.host}:${config.port}')!
|
||||
return DB{
|
||||
conn: conn
|
||||
version: config.version
|
||||
cmd_buf: []u8{cap: cmd_buf_pre_allocate_len}
|
||||
resp_buf: []u8{cap: resp_buf_pre_allocate_len}
|
||||
}
|
||||
}
|
||||
|
||||
// close terminates the connection to Redis server
|
||||
pub fn (mut db DB) close() ! {
|
||||
db.conn.close()!
|
||||
}
|
||||
|
||||
// ping sends a PING command to verify server responsiveness
|
||||
pub fn (mut db DB) ping() !string {
|
||||
db.conn.write_string('*1\r\n$4\r\nPING\r\n')!
|
||||
return db.read_response()! as string
|
||||
}
|
||||
|
||||
// del deletes a `key`
|
||||
pub fn (mut db DB) del(key string) !i64 {
|
||||
// *2\r\n$3\r\nDEL\r\n$6\r\ncounter\r\n
|
||||
// send cmd
|
||||
db.cmd_buf.clear()
|
||||
db.cmd_buf << '*2\r\n$3\r\nDEL\r\n$${key.len}\r\n${key}\r\n'.bytes()
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << db.cmd_buf
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
db.conn.write(db.cmd_buf)!
|
||||
|
||||
// read resp
|
||||
return db.read_response()! as i64
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// set stores a `key`-value` pair in Redis. Supported value types: number, string, []u8
|
||||
pub fn (mut db DB) set[T](key string, value T) !string {
|
||||
// *3\r\n$3\r\nSET\r\n$4\r\nname\r\n$5\r\nVlang\r\n
|
||||
db.cmd_buf.clear()
|
||||
db.cmd_buf << '*3\r\n$3\r\nSET\r\n$${key.len}\r\n${key}\r\n'.bytes()
|
||||
$if T is $int {
|
||||
val_str := value.str()
|
||||
db.cmd_buf << '$${val_str.len}\r\n${val_str}'.bytes()
|
||||
} $else $if T is string {
|
||||
db.cmd_buf << '$${value.len}\r\n${value}'.bytes()
|
||||
} $else $if T is []u8 {
|
||||
db.cmd_buf << '$${value.len}\r\n'.bytes()
|
||||
db.cmd_buf << value
|
||||
} $else {
|
||||
return error('`set()`: unsupported value type. Allowed: number, string, []u8')
|
||||
}
|
||||
db.cmd_buf << '\r\n'.bytes()
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << db.cmd_buf
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
db.conn.write(db.cmd_buf)!
|
||||
return db.read_response()! as string
|
||||
}
|
||||
return ''
|
||||
}
|
||||
|
||||
// get retrieves the value of a `key`. Supported return types: string, int, []u8
|
||||
pub fn (mut db DB) get[T](key string) !T {
|
||||
// *2\r\n$3\r\nGET\r\n$4\r\nname\r\n
|
||||
// send cmd
|
||||
db.cmd_buf.clear()
|
||||
db.cmd_buf << '*2\r\n$3\r\nGET\r\n$${key.len}\r\n${key}\r\n'.bytes()
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << db.cmd_buf
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
db.conn.write(db.cmd_buf)!
|
||||
resp := db.read_response()!
|
||||
match resp {
|
||||
[]u8 {
|
||||
$if T is string {
|
||||
return resp.bytestr()
|
||||
} $else $if T is $int {
|
||||
return T(resp.bytestr().i64())
|
||||
} $else $if T is []u8 {
|
||||
return resp
|
||||
}
|
||||
}
|
||||
RedisNull {
|
||||
return error('`get()`: key ${key} not found')
|
||||
}
|
||||
else {
|
||||
return error('`get()`: unexpected response type')
|
||||
}
|
||||
}
|
||||
return error('`get()`: unsupported data type')
|
||||
}
|
||||
return T{}
|
||||
}
|
||||
|
||||
// incr increments the integer value of a `key` by 1
|
||||
pub fn (mut db DB) incr(key string) !i64 {
|
||||
// *2\r\n$4\r\nINCR\r\n$6\r\ncounter\r\n
|
||||
// send cmd
|
||||
db.cmd_buf.clear()
|
||||
db.cmd_buf << '*2\r\n$4\r\nINCR\r\n$${key.len}\r\n${key}\r\n'.bytes()
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << db.cmd_buf
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
db.conn.write(db.cmd_buf)!
|
||||
|
||||
// read resp
|
||||
return db.read_response()! as i64
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// decr decrements the integer value of a `key` by 1
|
||||
pub fn (mut db DB) decr(key string) !i64 {
|
||||
// *2\r\n$4\r\nDECR\r\n$6\r\ncounter\r\n
|
||||
// send cmd
|
||||
db.cmd_buf.clear()
|
||||
db.cmd_buf << '*2\r\n$4\r\nDECR\r\n$${key.len}\r\n${key}\r\n'.bytes()
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << db.cmd_buf
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
db.conn.write(db.cmd_buf)!
|
||||
|
||||
// read resp
|
||||
return db.read_response()! as i64
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// hset sets multiple `key`-`value` pairs in a hash. Supported value types: string, int, []u8
|
||||
pub fn (mut db DB) hset[T](key string, m map[string]T) !int {
|
||||
// HSET user:1 name "John" age 30
|
||||
// *6\r\n$4\r\nHSET\r\n$6\r\nuser:1\r\n$4\r\nname\r\n$4\r\nJohn\r\n$3\r\nage\r\n$2\r\n30\r\n
|
||||
db.cmd_buf.clear()
|
||||
db.cmd_buf << '*${2 + m.len * 2}\r\n$4\r\nHSET\r\n$${key.len}\r\n${key}\r\n'.bytes()
|
||||
for k, v in m {
|
||||
db.cmd_buf << '$${k.len}\r\n${k}\r\n'.bytes()
|
||||
$if T is string {
|
||||
db.cmd_buf << '$${v.len}\r\n${v}\r\n'.bytes()
|
||||
} $else $if T is $int {
|
||||
v_str := v.str()
|
||||
db.cmd_buf << '$${v_str.len}\r\n${v_str}\r\n'.bytes()
|
||||
} $else $if T is []u8 {
|
||||
db.cmd_buf << '$${v.len}\r\n$'.bytes()
|
||||
db.cmd_buf << v
|
||||
db.cmd_buf << '\r\n'.bytes()
|
||||
} $else {
|
||||
return error('`hset()`: unsupported value type. Allowed: number, string, []u8')
|
||||
}
|
||||
}
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << db.cmd_buf
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
db.conn.write(db.cmd_buf)!
|
||||
return int(db.read_response()! as i64)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// hget retrieves the value of a hash field. Supported return types: string, int, []u8
|
||||
pub fn (mut db DB) hget[T](key string, m_key string) !T {
|
||||
// HGET user:1 name
|
||||
// *3\r\n$4\r\nHGET\r\n$6\r\nuser:1\r\n$4\r\nname\r\n
|
||||
db.cmd_buf.clear()
|
||||
db.cmd_buf << '*3\r\n$4\r\nHGET\r\n$${key.len}\r\n${key}\r\n'.bytes()
|
||||
db.cmd_buf << '$${m_key.len}\r\n${m_key}\r\n'.bytes()
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << db.cmd_buf
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
db.conn.write(db.cmd_buf)!
|
||||
resp := db.read_response()! as []u8
|
||||
$if T is string {
|
||||
return resp.bytestr()
|
||||
} $else $if T is $int {
|
||||
return resp.bytestr().i64()
|
||||
} $else $if T is []u8 {
|
||||
return resp
|
||||
}
|
||||
return error('`hget()`: unsupported return type. Allowed: number, string, []u8')
|
||||
}
|
||||
return T{}
|
||||
}
|
||||
|
||||
// hgetall retrieves all fields and values of a hash. Supported value types: string, int, []u8
|
||||
pub fn (mut db DB) hgetall[T](key string) !map[string]T {
|
||||
// HGETALL user:1
|
||||
// *2\r\n$7\r\nHGETALL\r\n$6\r\nuser:1\r\n
|
||||
$if T !is string && T !is $int && T !is []u8 {
|
||||
return error('`hgetall()`: unsupported value type. Allowed: number, string, []u8')
|
||||
}
|
||||
db.cmd_buf.clear()
|
||||
db.cmd_buf << '*2\r\n$7\r\nHGETALL\r\n$${key.len}\r\n${key}\r\n'.bytes()
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << db.cmd_buf
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
db.conn.write(db.cmd_buf)!
|
||||
resp := db.read_response()!
|
||||
match resp {
|
||||
[]RedisValue {
|
||||
mut result := map[string]T{}
|
||||
elements := resp
|
||||
|
||||
if elements.len % 2 != 0 {
|
||||
return error('`hgetall()`: invalid HGETALL response format')
|
||||
}
|
||||
|
||||
for i in 0 .. elements.len / 2 {
|
||||
key_resp := elements[2 * i] as []u8
|
||||
val_resp := elements[2 * i + 1] as []u8
|
||||
|
||||
k := key_resp.bytestr()
|
||||
|
||||
$if T is string {
|
||||
result[k] = val_resp.bytestr()
|
||||
} $else $if T is $int {
|
||||
result[k] = val_resp.bytestr().i64()
|
||||
} $else $if T is []u8 {
|
||||
result[k] = val_resp
|
||||
} $else {
|
||||
error('`hgetall()`: invalid value type for map: ${T.type_name()}')
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
else {
|
||||
return error('`hgetall()`: expected array response, got: ${resp.type_name()}')
|
||||
}
|
||||
}
|
||||
}
|
||||
return map[string]T{}
|
||||
}
|
||||
|
||||
// expire sets a `key`'s time to live in `seconds`
|
||||
pub fn (mut db DB) expire(key string, seconds int) !bool {
|
||||
// *3\r\n$6\r\nEXPIRE\r\n$6\r\ncounter\r\n$3\r\n600\r\n
|
||||
// send cmd
|
||||
seconds_str := seconds.str()
|
||||
db.cmd_buf.clear()
|
||||
db.cmd_buf << '*3\r\n$6\r\nEXPIRE\r\n$${key.len}\r\n${key}\r\n'.bytes()
|
||||
db.cmd_buf << '$${seconds_str.len}\r\n${seconds_str}\r\n'.bytes()
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << db.cmd_buf
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
db.conn.write(db.cmd_buf)!
|
||||
|
||||
// read resp
|
||||
resp := db.read_response()! as i64
|
||||
return resp != 0
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// read_response_bulk_string handles Redis bulk string responses (format: $<length>\r\n<data>\r\n)
|
||||
fn (mut db DB) read_response_bulk_string() !RedisValue {
|
||||
mut data_length := i64(-1)
|
||||
mut chunk := []u8{len: 1}
|
||||
|
||||
db.resp_buf.clear()
|
||||
for {
|
||||
bytes_read := db.conn.read(mut chunk) or {
|
||||
return error('`read_response_bulk_string()`: connection error ${err}')
|
||||
}
|
||||
if bytes_read == 0 {
|
||||
return error('`read_response_bulk_string()`: connection closed prematurely')
|
||||
}
|
||||
db.resp_buf << chunk[0]
|
||||
|
||||
if chunk[0] == `\n` {
|
||||
break
|
||||
}
|
||||
if (chunk[0] < `0` || chunk[0] > `9`) && chunk[0] != `\r` && chunk[0] != `-` {
|
||||
return error('`read_response_bulk_string()`: invalid bulk string header')
|
||||
}
|
||||
}
|
||||
|
||||
if db.resp_buf.len < 2 {
|
||||
return error('`read_response_bulk_string()`: bulk string header too short')
|
||||
}
|
||||
|
||||
data_length = db.resp_buf[0..db.resp_buf.len - 2].bytestr().i64()
|
||||
|
||||
if data_length == 0 {
|
||||
mut terminator := []u8{len: 2}
|
||||
db.conn.read(mut terminator)!
|
||||
if terminator[0] != `\r` || terminator[1] != `\n` {
|
||||
return error('invalid terminator for empty string')
|
||||
}
|
||||
return []u8{}
|
||||
}
|
||||
if data_length == -1 {
|
||||
return RedisNull{}
|
||||
}
|
||||
|
||||
mut data_buf := []u8{len: int(data_length) + 2} // +2 for ending \r\n
|
||||
mut total_read := 0
|
||||
|
||||
for total_read < data_buf.len {
|
||||
remaining := data_buf.len - total_read
|
||||
chunk_size := if remaining > 1 { 1 } else { remaining }
|
||||
mut chunk_ptr := unsafe { &data_buf[total_read] }
|
||||
|
||||
bytes_read := db.conn.read_ptr(chunk_ptr, chunk_size)!
|
||||
total_read += bytes_read
|
||||
|
||||
if bytes_read == 0 && total_read < data_buf.len {
|
||||
return error('`read_response_bulk_string()`: incomplete data: read ${total_read} / ${data_buf.len} bytes')
|
||||
}
|
||||
}
|
||||
|
||||
// must ending with CRLF
|
||||
if data_buf[data_length] != `\r` || data_buf[data_length + 1] != `\n` {
|
||||
return error('`read_response_bulk_string()`: invalid data terminator')
|
||||
}
|
||||
|
||||
return data_buf[0..data_length].clone()
|
||||
}
|
||||
|
||||
// read_response_i64 handles Redis integer responses (format: :<number>\r\n)
|
||||
fn (mut db DB) read_response_i64() !i64 {
|
||||
db.resp_buf.clear()
|
||||
unsafe { db.resp_buf.grow_len(resp_buf_pre_allocate_len) }
|
||||
mut total_read := 0
|
||||
|
||||
for total_read < db.resp_buf.len {
|
||||
remaining := db.resp_buf.len - total_read
|
||||
chunk_size := if remaining > 1 { 1 } else { remaining }
|
||||
mut chunk_ptr := unsafe { &db.resp_buf[total_read] }
|
||||
|
||||
bytes_read := db.conn.read_ptr(chunk_ptr, chunk_size)!
|
||||
total_read += bytes_read
|
||||
|
||||
if total_read > 2 {
|
||||
if db.resp_buf[total_read - 2] == `\r` && db.resp_buf[total_read - 1] == `\n` {
|
||||
break
|
||||
}
|
||||
}
|
||||
if bytes_read == 0 {
|
||||
return error('`read_response_i64()`: incomplete data: read ${total_read} bytes')
|
||||
}
|
||||
}
|
||||
ret_val := db.resp_buf[0..total_read - 2].bytestr().i64()
|
||||
return ret_val
|
||||
}
|
||||
|
||||
// read_response_simple_string handles Redis simple string responses (format: +<string>\r\n)
|
||||
fn (mut db DB) read_response_simple_string() !string {
|
||||
db.resp_buf.clear()
|
||||
unsafe { db.resp_buf.grow_len(resp_buf_pre_allocate_len) }
|
||||
mut total_read := 0
|
||||
|
||||
for total_read < db.resp_buf.len {
|
||||
remaining := db.resp_buf.len - total_read
|
||||
chunk_size := if remaining > 1 { 1 } else { remaining }
|
||||
mut chunk_ptr := unsafe { &db.resp_buf[total_read] }
|
||||
|
||||
bytes_read := db.conn.read_ptr(chunk_ptr, chunk_size)!
|
||||
total_read += bytes_read
|
||||
|
||||
if total_read > 2 {
|
||||
if db.resp_buf[total_read - 2] == `\r` && db.resp_buf[total_read - 1] == `\n` {
|
||||
break
|
||||
}
|
||||
}
|
||||
if bytes_read == 0 {
|
||||
return error('`read_response_simple_string()`: incomplete data: read ${total_read} bytes')
|
||||
}
|
||||
}
|
||||
return db.resp_buf[0..total_read - 2].bytestr()
|
||||
}
|
||||
|
||||
// read_response_array handles Redis array responses (format: *<length>\r\n<elements>)
|
||||
fn (mut db DB) read_response_array() !RedisValue {
|
||||
mut array_len := i64(-1)
|
||||
mut chunk := []u8{len: 1}
|
||||
|
||||
db.resp_buf.clear()
|
||||
for {
|
||||
bytes_read := db.conn.read(mut chunk) or {
|
||||
return error('`read_response_array()`: connection error: ${err}')
|
||||
}
|
||||
if bytes_read == 0 {
|
||||
return error('`read_response_array()`: connection closed prematurely')
|
||||
}
|
||||
db.resp_buf << chunk[0]
|
||||
|
||||
if chunk[0] == `\n` {
|
||||
break
|
||||
}
|
||||
if (chunk[0] < `0` || chunk[0] > `9`) && chunk[0] != `\r` && chunk[0] != `-` {
|
||||
return error('`read_response_array()`: invalid array header')
|
||||
}
|
||||
}
|
||||
|
||||
if db.resp_buf.len < 2 {
|
||||
return error('`read_response_array()`: array header too short')
|
||||
}
|
||||
|
||||
array_len = db.resp_buf[0..db.resp_buf.len - 2].bytestr().i64() // 排除\r\n
|
||||
|
||||
if array_len == -1 {
|
||||
return RedisNull{}
|
||||
}
|
||||
if array_len == 0 {
|
||||
return []RedisValue{}
|
||||
}
|
||||
|
||||
mut elements := []RedisValue{cap: int(array_len)}
|
||||
for _ in 0 .. array_len {
|
||||
element := db.read_response() or {
|
||||
return error('`read_response_array()`: failed to read array element: ${err}')
|
||||
}
|
||||
elements << element
|
||||
}
|
||||
return elements
|
||||
}
|
||||
|
||||
// read_response handles all types of Redis responses
|
||||
fn (mut db DB) read_response() !RedisValue {
|
||||
db.resp_buf.clear()
|
||||
unsafe { db.resp_buf.grow_len(1) }
|
||||
read_len := db.conn.read(mut db.resp_buf)!
|
||||
if read_len != 1 {
|
||||
return error('`read_response()`: empty response from server')
|
||||
}
|
||||
match db.resp_buf[0] {
|
||||
`+` { // Simple string
|
||||
return db.read_response_simple_string()!
|
||||
}
|
||||
`-` { // Error message
|
||||
msg := db.read_response_simple_string()!
|
||||
return error(msg)
|
||||
}
|
||||
`:` { // Integer
|
||||
return db.read_response_i64()!
|
||||
}
|
||||
`$` { // Bulk string
|
||||
return db.read_response_bulk_string()!
|
||||
}
|
||||
`*` { // Array
|
||||
return db.read_response_array()!
|
||||
}
|
||||
else {
|
||||
return error('`read_response()`: unknown response prefix: ${db.resp_buf.bytestr()}')
|
||||
}
|
||||
}
|
||||
return error('`read_response()`: unreachable code')
|
||||
}
|
||||
|
||||
// cmd sends a custom command to Redis server
|
||||
// for example: db.cmd('SET', 'key', 'value')!
|
||||
pub fn (mut db DB) cmd(cmd ...string) !RedisValue {
|
||||
mut sb := strings.new_builder(cmd.len * 20)
|
||||
sb.write_string('*${cmd.len}\r\n') // Command array header
|
||||
for arg in cmd {
|
||||
sb.write_string('$${arg.len}\r\n${arg}\r\n')
|
||||
}
|
||||
if db.pipeline_mode {
|
||||
db.pipeline_buffer << unsafe { sb.reuse_as_plain_u8_array() }
|
||||
db.pipeline_cmd_count++
|
||||
} else {
|
||||
unsafe { db.conn.write(sb.reuse_as_plain_u8_array())! }
|
||||
return db.read_response()!
|
||||
}
|
||||
return RedisNull{}
|
||||
}
|
||||
|
||||
// pipeline_start start a new pipeline
|
||||
pub fn (mut db DB) pipeline_start() {
|
||||
db.pipeline_mode = true
|
||||
db.pipeline_cmd_count = 0
|
||||
db.pipeline_buffer.clear()
|
||||
}
|
||||
|
||||
// pipeline_execute executes the cmds in pipeline at once and retrieves all responses
|
||||
pub fn (mut db DB) pipeline_execute() ![]RedisValue {
|
||||
if !db.pipeline_mode {
|
||||
return error('`pipeline_execute()`: pipeline not started')
|
||||
}
|
||||
defer {
|
||||
db.pipeline_mode = false
|
||||
}
|
||||
if db.pipeline_buffer.len == 0 {
|
||||
return []RedisValue{}
|
||||
}
|
||||
|
||||
db.conn.write(db.pipeline_buffer)!
|
||||
|
||||
mut results := []RedisValue{cap: db.pipeline_cmd_count}
|
||||
for _ in 0 .. db.pipeline_cmd_count {
|
||||
results << db.read_response()!
|
||||
}
|
||||
|
||||
// reset to non-pipeline mode
|
||||
db.pipeline_mode = false
|
||||
db.pipeline_cmd_count = 0
|
||||
return results
|
||||
}
|
126
vlib/db/redis/redis_test.v
Normal file
126
vlib/db/redis/redis_test.v
Normal file
@ -0,0 +1,126 @@
|
||||
// vtest build: present_redis?
|
||||
import db.redis
|
||||
import time
|
||||
|
||||
fn test_redis() {
|
||||
mut db := redis.connect() or { panic(err) }
|
||||
assert db.ping()! == 'PONG'
|
||||
|
||||
// delete all keys first
|
||||
db.cmd('FLUSHALL')!
|
||||
|
||||
// test set[T]
|
||||
assert db.set('int', 123)! == 'OK'
|
||||
assert db.set('string', 'abc')! == 'OK'
|
||||
assert db.set('bin', [u8(0x00), 0x01, 0x02, 0x03])! == 'OK'
|
||||
|
||||
// test get[T]
|
||||
assert db.get[i64]('int')! == 123
|
||||
assert db.get[string]('string')! == 'abc'
|
||||
assert db.get[[]u8]('bin')! == [u8(0x00), 0x01, 0x02, 0x03]
|
||||
|
||||
// test incr/decr
|
||||
assert db.incr('int')! == 124
|
||||
assert db.decr('int')! == 123
|
||||
|
||||
// test hset/hget/hgetall
|
||||
m := {
|
||||
'a': '1'
|
||||
'b': '2'
|
||||
'c': '3'
|
||||
}
|
||||
assert db.hset('map', m)! == m.len
|
||||
assert db.hget[string]('map', 'a')! == '1'
|
||||
assert db.hget[string]('map', 'b')! == '2'
|
||||
assert db.hget[string]('map', 'c')! == '3'
|
||||
assert db.hgetall[string]('map')! == m
|
||||
|
||||
// test expire
|
||||
assert db.expire('int', 1)!
|
||||
time.sleep(2 * time.second)
|
||||
_ := db.get[i64]('int') or {
|
||||
assert err.msg().contains('key int not found')
|
||||
0
|
||||
}
|
||||
|
||||
// test del
|
||||
assert db.del('string')! == 1
|
||||
_ := db.get[string]('string') or {
|
||||
assert err.msg().contains('key string not found')
|
||||
''
|
||||
}
|
||||
|
||||
// test custom cmd
|
||||
assert db.cmd('SET', 'bigint', '123456')! as string == 'OK'
|
||||
assert db.cmd('GET', 'bigint')! as []u8 == [u8(49), 50, 51, 52, 53, 54]
|
||||
db.close()!
|
||||
}
|
||||
|
||||
fn test_redis_pipeline() {
|
||||
mut db := redis.connect() or { panic(err) }
|
||||
assert db.ping()! == 'PONG'
|
||||
|
||||
// start pipleline mode
|
||||
db.pipeline_start()
|
||||
// delete all keys first
|
||||
db.cmd('FLUSHALL')!
|
||||
a := db.pipeline_execute()!
|
||||
assert a == [redis.RedisValue('OK')]
|
||||
|
||||
// restart pipeline again
|
||||
db.pipeline_start()
|
||||
// test set[T]
|
||||
db.set('int', 123)!
|
||||
db.set('string', 'abc')!
|
||||
db.set('bin', [u8(0x00), 0x01, 0x02, 0x03])!
|
||||
|
||||
// test get[T]
|
||||
db.get[i64]('int')!
|
||||
db.get[string]('string')!
|
||||
db.get[[]u8]('bin')!
|
||||
|
||||
// test incr/decr
|
||||
db.incr('int')!
|
||||
db.incr('int')!
|
||||
db.decr('int')!
|
||||
|
||||
// test hset/hget/hgetall
|
||||
m := {
|
||||
'a': '1'
|
||||
'b': '2'
|
||||
'c': '3'
|
||||
}
|
||||
db.hset('map', m)!
|
||||
db.hget[string]('map', 'a')!
|
||||
db.hget[string]('map', 'b')!
|
||||
db.hget[string]('map', 'c')!
|
||||
db.hgetall[string]('map')!
|
||||
|
||||
// test custom cmd
|
||||
db.cmd('SET', 'bigint', '123456')!
|
||||
db.cmd('GET', 'bigint')!
|
||||
|
||||
b := db.pipeline_execute()!
|
||||
assert b == [
|
||||
redis.RedisValue('OK'),
|
||||
redis.RedisValue('OK'),
|
||||
redis.RedisValue('OK'),
|
||||
redis.RedisValue([u8(49), 50, 51]),
|
||||
redis.RedisValue([u8(97), 98, 99]),
|
||||
redis.RedisValue([u8(0), 1, 2, 3]),
|
||||
redis.RedisValue(i64(124)),
|
||||
redis.RedisValue(i64(125)),
|
||||
redis.RedisValue(i64(124)),
|
||||
redis.RedisValue(i64(3)),
|
||||
redis.RedisValue([u8(49)]),
|
||||
redis.RedisValue([u8(50)]),
|
||||
redis.RedisValue([u8(51)]),
|
||||
redis.RedisValue([redis.RedisValue([u8(97)]), redis.RedisValue([u8(49)]),
|
||||
redis.RedisValue([u8(98)]), redis.RedisValue([u8(50)]), redis.RedisValue([u8(99)]),
|
||||
redis.RedisValue([u8(51)])]),
|
||||
redis.RedisValue('OK'),
|
||||
redis.RedisValue([u8(49), 50, 51, 52, 53, 54]),
|
||||
]
|
||||
|
||||
db.close()!
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user