mirror of
https://github.com/TecharoHQ/anubis.git
synced 2025-08-03 01:38:14 -04:00
fix(lib/store/bbolt): use a multi-bucket flow instead of a single bucket flow (#761)
* fix(lib/store/bbolt): use a multi-bucket flow instead of a single bucket flow Signed-off-by: Xe Iaso <me@xeiaso.net> * Update metadata check-spelling run (push) for Xe/optimize-bbolt Signed-off-by: check-spelling-bot <check-spelling-bot@users.noreply.github.com> on-behalf-of: @check-spelling <check-spelling-bot@check-spelling.dev> * fix(lib/store/bbolt): gracefully handle the obsolete anubis bucket in cleanup Signed-off-by: Xe Iaso <me@xeiaso.net> --------- Signed-off-by: Xe Iaso <me@xeiaso.net> Signed-off-by: check-spelling-bot <check-spelling-bot@users.noreply.github.com>
This commit is contained in:
parent
7d0c58d1a8
commit
f98750b038
1
.github/actions/spelling/expect.txt
vendored
1
.github/actions/spelling/expect.txt
vendored
@ -31,6 +31,7 @@ botstopper
|
||||
BPort
|
||||
Brightbot
|
||||
broked
|
||||
byteslice
|
||||
Bytespider
|
||||
cachebuster
|
||||
cachediptoasn
|
||||
|
@ -289,10 +289,9 @@ When Anubis opens a bbolt database, it takes an exclusive lock on that database.
|
||||
|
||||
The `bbolt` backend takes the following configuration options:
|
||||
|
||||
| Name | Type | Example | Description |
|
||||
| :------- | :----- | :----------------- | :-------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `bucket` | string | `anubis` | The bbolt bucket that Anubis should place all its data into. If this is not set, then Anubis will default to the bucket `anubis`. |
|
||||
| `path` | path | `/data/anubis.bdb` | The filesystem path for the Anubis bbolt database. Anubis requires write access to the folder containing the bbolt database. |
|
||||
| Name | Type | Example | Description |
|
||||
| :----- | :--- | :----------------- | :--------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `path` | path | `/data/anubis.bdb` | The filesystem path for the Anubis bbolt database. Anubis requires write access to the folder containing the bbolt database. |
|
||||
|
||||
Example:
|
||||
|
||||
|
@ -2,7 +2,6 @@ package bbolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
@ -12,52 +11,85 @@ import (
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// Sentinel error values used for testing and in admin-visible error messages.
|
||||
var (
|
||||
ErrBucketDoesNotExist = errors.New("bbolt: bucket does not exist")
|
||||
ErrNotExists = errors.New("bbolt: value does not exist in store")
|
||||
)
|
||||
|
||||
type Item struct {
|
||||
Data []byte `json:"data"`
|
||||
Expires time.Time `json:"expires"`
|
||||
}
|
||||
|
||||
// Store implements store.Interface backed by bbolt[1].
|
||||
//
|
||||
// In essence, bbolt is a hierarchical key/value store with a twist: every value
|
||||
// needs to belong to a bucket. Buckets can contain an infinite number of
|
||||
// buckets. As such, Anubis nests values in buckets. Each value in the store
|
||||
// is given its own bucket with two keys:
|
||||
//
|
||||
// 1. data - The raw data, usually in JSON
|
||||
// 2. expiry - The expiry time formatted as a time.RFC3339Nano timestamp string
|
||||
//
|
||||
// When Anubis stores a new bit of data, it creates a new bucket for that value.
|
||||
// This allows the cleanup phase to iterate over every bucket in the database and
|
||||
// only scan the expiry times without having to decode the entire record.
|
||||
//
|
||||
// bbolt is not suitable for environments where multiple instance of Anubis need
|
||||
// to read from and write to the same backend store. For that, use the valkey
|
||||
// storage backend.
|
||||
//
|
||||
// [1]: https://github.com/etcd-io/bbolt
|
||||
type Store struct {
|
||||
bucket []byte
|
||||
bdb *bbolt.DB
|
||||
bdb *bbolt.DB
|
||||
}
|
||||
|
||||
// Delete a key from the datastore. If the key does not exist, return an error.
|
||||
func (s *Store) Delete(ctx context.Context, key string) error {
|
||||
return s.bdb.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("%w: %q", ErrBucketDoesNotExist, string(s.bucket))
|
||||
}
|
||||
|
||||
if bkt.Get([]byte(key)) == nil {
|
||||
if tx.Bucket([]byte(key)) == nil {
|
||||
return fmt.Errorf("%w: %q", ErrNotExists, key)
|
||||
}
|
||||
|
||||
return bkt.Delete([]byte(key))
|
||||
return tx.DeleteBucket([]byte(key))
|
||||
})
|
||||
}
|
||||
|
||||
// Get a value from the datastore.
|
||||
//
|
||||
// Because each value is stored in its own bucket with data and expiry keys,
|
||||
// two get operations are required:
|
||||
//
|
||||
// 1. Get the expiry key, parse as time.RFC3339Nano. If the key has expired, run deletion in the background and return a "key not found" error.
|
||||
// 2. Get the data key, copy into the result byteslice, return it.
|
||||
func (s *Store) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
var i Item
|
||||
var result []byte
|
||||
|
||||
if err := s.bdb.View(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("%w: %q", ErrBucketDoesNotExist, string(s.bucket))
|
||||
}
|
||||
|
||||
bucketData := bkt.Get([]byte(key))
|
||||
if bucketData == nil {
|
||||
itemBucket := tx.Bucket([]byte(key))
|
||||
if itemBucket == nil {
|
||||
return fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(bucketData, &i); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrCantDecode, err)
|
||||
expiryStr := itemBucket.Get([]byte("expiry"))
|
||||
if expiryStr == nil {
|
||||
return fmt.Errorf("[unexpected] %w: %q (expiry is nil)", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
expiry, err := time.Parse(time.RFC3339Nano, string(expiryStr))
|
||||
if err != nil {
|
||||
return fmt.Errorf("[unexpected] %w: %w", store.ErrCantDecode, err)
|
||||
}
|
||||
|
||||
if time.Now().After(expiry) {
|
||||
go s.Delete(context.Background(), key)
|
||||
return fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
dataStr := itemBucket.Get([]byte("data"))
|
||||
if dataStr == nil {
|
||||
return fmt.Errorf("[unexpected] %w: %q (data is nil)", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
result = make([]byte, len(dataStr))
|
||||
if n := copy(result, dataStr); n != len(dataStr) {
|
||||
return fmt.Errorf("[unexpected] %w: %d bytes copied of %d", store.ErrCantDecode, n, len(dataStr))
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -65,32 +97,28 @@ func (s *Store) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if time.Now().After(i.Expires) {
|
||||
go s.Delete(context.Background(), key)
|
||||
return nil, fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
return i.Data, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Set a value into the store with a given expiry.
|
||||
func (s *Store) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
|
||||
i := Item{
|
||||
Data: value,
|
||||
Expires: time.Now().Add(expiry),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(i)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrCantEncode, err)
|
||||
}
|
||||
expires := time.Now().Add(expiry)
|
||||
|
||||
return s.bdb.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("%w: %q", ErrBucketDoesNotExist, string(s.bucket))
|
||||
valueBkt, err := tx.CreateBucketIfNotExists([]byte(key))
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %w: %q (create bucket)", store.ErrCantEncode, err, key)
|
||||
}
|
||||
|
||||
return bkt.Put([]byte(key), data)
|
||||
if err := valueBkt.Put([]byte("expiry"), []byte(expires.Format(time.RFC3339Nano))); err != nil {
|
||||
return fmt.Errorf("%w: %q (expiry)", store.ErrCantEncode, key)
|
||||
}
|
||||
|
||||
if err := valueBkt.Put([]byte("data"), value); err != nil {
|
||||
return fmt.Errorf("%w: %q (data)", store.ErrCantEncode, key)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@ -98,31 +126,28 @@ func (s *Store) cleanup(ctx context.Context) error {
|
||||
now := time.Now()
|
||||
|
||||
return s.bdb.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("cache bucket %q does not exist", string(s.bucket))
|
||||
}
|
||||
return tx.ForEach(func(key []byte, valueBkt *bbolt.Bucket) error {
|
||||
var expiry time.Time
|
||||
var err error
|
||||
|
||||
return bkt.ForEach(func(k, v []byte) error {
|
||||
var i Item
|
||||
|
||||
data := bkt.Get(k)
|
||||
if data == nil {
|
||||
return fmt.Errorf("%s in Cache bucket does not exist???", string(k))
|
||||
expiryStr := valueBkt.Get([]byte("expiry"))
|
||||
if expiryStr == nil {
|
||||
slog.Warn("while running cleanup, expiry is not set somehow, file a bug?", "key", string(key))
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &i); err != nil {
|
||||
return fmt.Errorf("can't unmarshal data at key %s: %w", string(k), err)
|
||||
expiry, err = time.Parse(time.RFC3339Nano, string(expiryStr))
|
||||
if err != nil {
|
||||
return fmt.Errorf("[unexpected] %w in bucket %q: %w", store.ErrCantDecode, string(key), err)
|
||||
}
|
||||
|
||||
if now.After(i.Expires) {
|
||||
return bkt.Delete(k)
|
||||
if now.After(expiry) {
|
||||
return valueBkt.DeleteBucket(key)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (s *Store) cleanupThread(ctx context.Context) {
|
||||
|
@ -12,8 +12,7 @@ func TestImpl(t *testing.T) {
|
||||
path := filepath.Join(t.TempDir(), "db")
|
||||
t.Log(path)
|
||||
data, err := json.Marshal(Config{
|
||||
Path: path,
|
||||
Bucket: "anubis",
|
||||
Path: path,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -21,8 +21,12 @@ func init() {
|
||||
store.Register("bbolt", Factory{})
|
||||
}
|
||||
|
||||
// Factory builds new instances of the bbolt storage backend according to
|
||||
// configuration passed via a json.RawMessage.
|
||||
type Factory struct{}
|
||||
|
||||
// Build parses and validates the bbolt storage backend Config and creates
|
||||
// a new instance of it.
|
||||
func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) {
|
||||
var config Config
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
@ -33,28 +37,13 @@ func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if config.Bucket == "" {
|
||||
config.Bucket = "anubis"
|
||||
}
|
||||
|
||||
bdb, err := bbolt.Open(config.Path, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't open bbolt database %s: %w", config.Path, err)
|
||||
}
|
||||
|
||||
if err := bdb.Update(func(tx *bbolt.Tx) error {
|
||||
if _, err := tx.CreateBucketIfNotExists([]byte(config.Bucket)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("can't create bbolt bucket %q: %w", config.Bucket, err)
|
||||
}
|
||||
|
||||
result := &Store{
|
||||
bdb: bdb,
|
||||
bucket: []byte(config.Bucket),
|
||||
bdb: bdb,
|
||||
}
|
||||
|
||||
go result.cleanupThread(ctx)
|
||||
@ -62,6 +51,8 @@ func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Valid parses and validates the bbolt store Config or returns
|
||||
// an error.
|
||||
func (Factory) Valid(data json.RawMessage) error {
|
||||
var config Config
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
@ -75,11 +66,13 @@ func (Factory) Valid(data json.RawMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Config is the bbolt storage backend configuration.
|
||||
type Config struct {
|
||||
Path string `json:"path"`
|
||||
Bucket string `json:"bucket,omitempty"`
|
||||
// Path is the filesystem path of the database. The folder must be writable to Anubis.
|
||||
Path string `json:"path"`
|
||||
}
|
||||
|
||||
// Valid validates the configuration including checking if its containing folder is writable.
|
||||
func (c Config) Valid() error {
|
||||
var errs []error
|
||||
|
||||
@ -90,6 +83,7 @@ func (c Config) Valid() error {
|
||||
if err := os.WriteFile(filepath.Join(dir, ".test-file"), []byte(""), 0600); err != nil {
|
||||
errs = append(errs, ErrCantWriteToPath)
|
||||
}
|
||||
os.Remove(filepath.Join(dir, ".test-file"))
|
||||
}
|
||||
|
||||
if len(errs) != 0 {
|
||||
|
@ -38,7 +38,9 @@ func Common(t *testing.T, f store.Factory, config json.RawMessage) {
|
||||
|
||||
val, err := s.Get(t.Context(), t.Name())
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
t.Errorf("wanted %s to exist in store but it does not", t.Name())
|
||||
t.Errorf("wanted %s to exist in store but it does not: %v", t.Name(), err)
|
||||
} else if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(val, []byte(t.Name())) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user