mirror of
https://github.com/TecharoHQ/anubis.git
synced 2025-09-12 14:17:57 -04:00
feat(lib): implement store interface
Signed-off-by: Xe Iaso <me@xeiaso.net>
This commit is contained in:
parent
32afc9c040
commit
0f9da86003
61
lib/store/decaymap.go
Normal file
61
lib/store/decaymap.go
Normal file
@ -0,0 +1,61 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/decaymap"
|
||||
)
|
||||
|
||||
type decayMapStore struct {
|
||||
store *decaymap.Impl[string, []byte]
|
||||
}
|
||||
|
||||
func (d *decayMapStore) Delete(_ context.Context, key string) error {
|
||||
if !d.store.Delete(key) {
|
||||
return fmt.Errorf("%w: %q", ErrNotFound, key)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *decayMapStore) Get(_ context.Context, key string) ([]byte, error) {
|
||||
result, ok := d.store.Get(key)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: %q", ErrNotFound, key)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (d *decayMapStore) Set(_ context.Context, key string, value []byte, expiry time.Duration) error {
|
||||
d.store.Set(key, value, expiry)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *decayMapStore) cleanupThread(ctx context.Context) {
|
||||
t := time.NewTicker(5 * time.Minute)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
d.store.Cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewDecayMapStore creates a simple in-memory store. This will not scale
|
||||
// to multiple Anubis instances.
|
||||
func NewDecayMapStore(ctx context.Context) Interface {
|
||||
result := &decayMapStore{
|
||||
store: decaymap.New[string, []byte](),
|
||||
}
|
||||
|
||||
go result.cleanupThread(ctx)
|
||||
|
||||
return result
|
||||
}
|
76
lib/store/interface.go
Normal file
76
lib/store/interface.go
Normal file
@ -0,0 +1,76 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrNotFound is returned when the store implementation cannot find the value
|
||||
// for a given key.
|
||||
ErrNotFound = errors.New("store: key not found")
|
||||
|
||||
// ErrCantDecode is returned when a store adaptor cannot decode the store format
|
||||
// to a value used by the code.
|
||||
ErrCantDecode = errors.New("store: can't decode value")
|
||||
|
||||
// ErrCantEncode is returned when a store adaptor cannot encode the value into
|
||||
// the format that the store uses.
|
||||
ErrCantEncode = errors.New("store: can't encode value")
|
||||
)
|
||||
|
||||
// Interface defines the calls that Anubis uses for storage in a local or remote
|
||||
// datastore. This can be implemented with an in-memory, on-disk, or in-database
|
||||
// storage backend.
|
||||
type Interface interface {
|
||||
// Delete removes a value from the store by key.
|
||||
Delete(ctx context.Context, key string) error
|
||||
|
||||
// Get returns the value of a key assuming that value exists and has not expired.
|
||||
Get(ctx context.Context, key string) ([]byte, error)
|
||||
|
||||
// Set puts a value into the store that expires according to its expiry.
|
||||
Set(ctx context.Context, key string, value []byte, expiry time.Duration) error
|
||||
}
|
||||
|
||||
func z[T any]() T {
|
||||
return *new(T)
|
||||
}
|
||||
|
||||
type JSON[T any] struct {
|
||||
Underlying Interface
|
||||
}
|
||||
|
||||
func (j *JSON[T]) Delete(ctx context.Context, key string) error {
|
||||
return j.Underlying.Delete(ctx, key)
|
||||
}
|
||||
|
||||
func (j *JSON[T]) Get(ctx context.Context, key string) (T, error) {
|
||||
data, err := j.Underlying.Get(ctx, key)
|
||||
if err != nil {
|
||||
return z[T](), err
|
||||
}
|
||||
|
||||
var result T
|
||||
if err := json.Unmarshal(data, &result); err != nil {
|
||||
return z[T](), fmt.Errorf("%w: %w", ErrCantDecode, err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (j *JSON[T]) Set(ctx context.Context, key string, value T, expiry time.Duration) error {
|
||||
data, err := json.Marshal(value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %w", ErrCantEncode, err)
|
||||
}
|
||||
|
||||
if err := j.Underlying.Set(ctx, key, data, expiry); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
75
lib/store/memory/memory.go
Normal file
75
lib/store/memory/memory.go
Normal file
@ -0,0 +1,75 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/decaymap"
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
)
|
||||
|
||||
type factory struct{}
|
||||
|
||||
func (factory) Build(ctx context.Context, _ json.RawMessage) (store.Interface, error) {
|
||||
return New(ctx), nil
|
||||
}
|
||||
|
||||
func (factory) Valid(json.RawMessage) error { return nil }
|
||||
|
||||
func init() {
|
||||
store.Register("memory", factory{})
|
||||
}
|
||||
|
||||
type impl struct {
|
||||
store *decaymap.Impl[string, []byte]
|
||||
}
|
||||
|
||||
func (i *impl) Delete(_ context.Context, key string) error {
|
||||
if !i.store.Delete(key) {
|
||||
return fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *impl) Get(_ context.Context, key string) ([]byte, error) {
|
||||
result, ok := i.store.Get(key)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (i *impl) Set(_ context.Context, key string, value []byte, expiry time.Duration) error {
|
||||
i.store.Set(key, value, expiry)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *impl) cleanupThread(ctx context.Context) {
|
||||
t := time.NewTicker(5 * time.Minute)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
i.store.Cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewDecayMapStore creates a simple in-memory store. This will not scale
|
||||
// to multiple Anubis instances.
|
||||
func New(ctx context.Context) store.Interface {
|
||||
result := &impl{
|
||||
store: decaymap.New[string, []byte](),
|
||||
}
|
||||
|
||||
go result.cleanupThread(ctx)
|
||||
|
||||
return result
|
||||
}
|
43
lib/store/registry.go
Normal file
43
lib/store/registry.go
Normal file
@ -0,0 +1,43 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
registry map[string]Factory = map[string]Factory{}
|
||||
regLock sync.RWMutex
|
||||
)
|
||||
|
||||
type Factory interface {
|
||||
Build(ctx context.Context, config json.RawMessage) (Interface, error)
|
||||
Valid(config json.RawMessage) error
|
||||
}
|
||||
|
||||
func Register(name string, impl Factory) {
|
||||
regLock.Lock()
|
||||
defer regLock.Unlock()
|
||||
|
||||
registry[name] = impl
|
||||
}
|
||||
|
||||
func Get(name string) (Factory, bool) {
|
||||
regLock.RLock()
|
||||
defer regLock.RUnlock()
|
||||
result, ok := registry[name]
|
||||
return result, ok
|
||||
}
|
||||
|
||||
func Methods() []string {
|
||||
regLock.RLock()
|
||||
defer regLock.RUnlock()
|
||||
var result []string
|
||||
for method := range registry {
|
||||
result = append(result, method)
|
||||
}
|
||||
sort.Strings(result)
|
||||
return result
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user