From 0f9da86003b6b9fe472c201d758dbba570a110cf Mon Sep 17 00:00:00 2001 From: Xe Iaso Date: Wed, 2 Jul 2025 23:12:27 +0000 Subject: [PATCH] feat(lib): implement store interface Signed-off-by: Xe Iaso --- lib/store/decaymap.go | 61 ++++++++++++++++++++++++++++++ lib/store/interface.go | 76 ++++++++++++++++++++++++++++++++++++++ lib/store/memory/memory.go | 75 +++++++++++++++++++++++++++++++++++++ lib/store/registry.go | 43 +++++++++++++++++++++ 4 files changed, 255 insertions(+) create mode 100644 lib/store/decaymap.go create mode 100644 lib/store/interface.go create mode 100644 lib/store/memory/memory.go create mode 100644 lib/store/registry.go diff --git a/lib/store/decaymap.go b/lib/store/decaymap.go new file mode 100644 index 0000000..71439e9 --- /dev/null +++ b/lib/store/decaymap.go @@ -0,0 +1,61 @@ +package store + +import ( + "context" + "fmt" + "time" + + "github.com/TecharoHQ/anubis/decaymap" +) + +type decayMapStore struct { + store *decaymap.Impl[string, []byte] +} + +func (d *decayMapStore) Delete(_ context.Context, key string) error { + if !d.store.Delete(key) { + return fmt.Errorf("%w: %q", ErrNotFound, key) + } + + return nil +} + +func (d *decayMapStore) Get(_ context.Context, key string) ([]byte, error) { + result, ok := d.store.Get(key) + if !ok { + return nil, fmt.Errorf("%w: %q", ErrNotFound, key) + } + + return result, nil +} + +func (d *decayMapStore) Set(_ context.Context, key string, value []byte, expiry time.Duration) error { + d.store.Set(key, value, expiry) + return nil +} + +func (d *decayMapStore) cleanupThread(ctx context.Context) { + t := time.NewTicker(5 * time.Minute) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + d.store.Cleanup() + } + } +} + +// NewDecayMapStore creates a simple in-memory store. This will not scale +// to multiple Anubis instances. +func NewDecayMapStore(ctx context.Context) Interface { + result := &decayMapStore{ + store: decaymap.New[string, []byte](), + } + + go result.cleanupThread(ctx) + + return result +} diff --git a/lib/store/interface.go b/lib/store/interface.go new file mode 100644 index 0000000..80734fa --- /dev/null +++ b/lib/store/interface.go @@ -0,0 +1,76 @@ +package store + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" +) + +var ( + // ErrNotFound is returned when the store implementation cannot find the value + // for a given key. + ErrNotFound = errors.New("store: key not found") + + // ErrCantDecode is returned when a store adaptor cannot decode the store format + // to a value used by the code. + ErrCantDecode = errors.New("store: can't decode value") + + // ErrCantEncode is returned when a store adaptor cannot encode the value into + // the format that the store uses. + ErrCantEncode = errors.New("store: can't encode value") +) + +// Interface defines the calls that Anubis uses for storage in a local or remote +// datastore. This can be implemented with an in-memory, on-disk, or in-database +// storage backend. +type Interface interface { + // Delete removes a value from the store by key. + Delete(ctx context.Context, key string) error + + // Get returns the value of a key assuming that value exists and has not expired. + Get(ctx context.Context, key string) ([]byte, error) + + // Set puts a value into the store that expires according to its expiry. + Set(ctx context.Context, key string, value []byte, expiry time.Duration) error +} + +func z[T any]() T { + return *new(T) +} + +type JSON[T any] struct { + Underlying Interface +} + +func (j *JSON[T]) Delete(ctx context.Context, key string) error { + return j.Underlying.Delete(ctx, key) +} + +func (j *JSON[T]) Get(ctx context.Context, key string) (T, error) { + data, err := j.Underlying.Get(ctx, key) + if err != nil { + return z[T](), err + } + + var result T + if err := json.Unmarshal(data, &result); err != nil { + return z[T](), fmt.Errorf("%w: %w", ErrCantDecode, err) + } + + return result, nil +} + +func (j *JSON[T]) Set(ctx context.Context, key string, value T, expiry time.Duration) error { + data, err := json.Marshal(value) + if err != nil { + return fmt.Errorf("%w: %w", ErrCantEncode, err) + } + + if err := j.Underlying.Set(ctx, key, data, expiry); err != nil { + return err + } + + return nil +} diff --git a/lib/store/memory/memory.go b/lib/store/memory/memory.go new file mode 100644 index 0000000..8457020 --- /dev/null +++ b/lib/store/memory/memory.go @@ -0,0 +1,75 @@ +package memory + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/TecharoHQ/anubis/decaymap" + "github.com/TecharoHQ/anubis/lib/store" +) + +type factory struct{} + +func (factory) Build(ctx context.Context, _ json.RawMessage) (store.Interface, error) { + return New(ctx), nil +} + +func (factory) Valid(json.RawMessage) error { return nil } + +func init() { + store.Register("memory", factory{}) +} + +type impl struct { + store *decaymap.Impl[string, []byte] +} + +func (i *impl) Delete(_ context.Context, key string) error { + if !i.store.Delete(key) { + return fmt.Errorf("%w: %q", store.ErrNotFound, key) + } + + return nil +} + +func (i *impl) Get(_ context.Context, key string) ([]byte, error) { + result, ok := i.store.Get(key) + if !ok { + return nil, fmt.Errorf("%w: %q", store.ErrNotFound, key) + } + + return result, nil +} + +func (i *impl) Set(_ context.Context, key string, value []byte, expiry time.Duration) error { + i.store.Set(key, value, expiry) + return nil +} + +func (i *impl) cleanupThread(ctx context.Context) { + t := time.NewTicker(5 * time.Minute) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + i.store.Cleanup() + } + } +} + +// NewDecayMapStore creates a simple in-memory store. This will not scale +// to multiple Anubis instances. +func New(ctx context.Context) store.Interface { + result := &impl{ + store: decaymap.New[string, []byte](), + } + + go result.cleanupThread(ctx) + + return result +} diff --git a/lib/store/registry.go b/lib/store/registry.go new file mode 100644 index 0000000..6bacd2a --- /dev/null +++ b/lib/store/registry.go @@ -0,0 +1,43 @@ +package store + +import ( + "context" + "encoding/json" + "sort" + "sync" +) + +var ( + registry map[string]Factory = map[string]Factory{} + regLock sync.RWMutex +) + +type Factory interface { + Build(ctx context.Context, config json.RawMessage) (Interface, error) + Valid(config json.RawMessage) error +} + +func Register(name string, impl Factory) { + regLock.Lock() + defer regLock.Unlock() + + registry[name] = impl +} + +func Get(name string) (Factory, bool) { + regLock.RLock() + defer regLock.RUnlock() + result, ok := registry[name] + return result, ok +} + +func Methods() []string { + regLock.RLock() + defer regLock.RUnlock() + var result []string + for method := range registry { + result = append(result, method) + } + sort.Strings(result) + return result +}