From 87c5c4297284791ca60ee9eccc4d685b05013272 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Wed, 1 Mar 2023 09:44:54 +0000 Subject: [PATCH] [chore/performance] simplify storage driver to use storage.Storage directly (#1576) * simply use storage.Storage, removing wrapping KVStore as we don't need KV store locking functionality Signed-off-by: kim * fix missing unwrapped function Signed-off-by: kim * add code comment Signed-off-by: kim * linter, please take my offering in peace Signed-off-by: kim --------- Signed-off-by: kim --- internal/api/client/media/mediacreate_test.go | 48 ++++++------- internal/media/manager_test.go | 2 - internal/media/prune.go | 69 ++++++++----------- internal/storage/storage.go | 49 +++++++++++-- testrig/storage.go | 24 ++----- 5 files changed, 97 insertions(+), 95 deletions(-) diff --git a/internal/api/client/media/mediacreate_test.go b/internal/api/client/media/mediacreate_test.go index aaace7a61..caa40b061 100644 --- a/internal/api/client/media/mediacreate_test.go +++ b/internal/api/client/media/mediacreate_test.go @@ -136,15 +136,13 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessful() { ctx.Set(oauth.SessionAuthorizedAccount, suite.testAccounts["local_account_1"]) // see what's in storage *before* the request - storageKeysBeforeRequest := []string{} - iter, err := suite.storage.KVStore.Iterator(context.Background(), nil) - if err != nil { + var storageKeysBeforeRequest []string + if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error { + storageKeysBeforeRequest = append(storageKeysBeforeRequest, key) + return nil + }); err != nil { panic(err) } - for iter.Next() { - storageKeysBeforeRequest = append(storageKeysBeforeRequest, iter.Key()) - } - iter.Release() // create the request buf, w, err := testrig.CreateMultipartFormData("file", "../../../../testrig/media/test-jpeg.jpg", map[string]string{ @@ -163,15 +161,13 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessful() { suite.mediaModule.MediaCreatePOSTHandler(ctx) // check what's in storage *after* the request - storageKeysAfterRequest := []string{} - iter, err = suite.storage.KVStore.Iterator(context.Background(), nil) - if err != nil { + var storageKeysAfterRequest []string + if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error { + storageKeysAfterRequest = append(storageKeysAfterRequest, key) + return nil + }); err != nil { panic(err) } - for iter.Next() { - storageKeysAfterRequest = append(storageKeysAfterRequest, iter.Key()) - } - iter.Release() // check response suite.EqualValues(http.StatusOK, recorder.Code) @@ -225,15 +221,13 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessfulV2() { ctx.Set(oauth.SessionAuthorizedAccount, suite.testAccounts["local_account_1"]) // see what's in storage *before* the request - storageKeysBeforeRequest := []string{} - iter, err := suite.storage.KVStore.Iterator(context.Background(), nil) - if err != nil { + var storageKeysBeforeRequest []string + if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error { + storageKeysBeforeRequest = append(storageKeysBeforeRequest, key) + return nil + }); err != nil { panic(err) } - for iter.Next() { - storageKeysBeforeRequest = append(storageKeysBeforeRequest, iter.Key()) - } - iter.Release() // create the request buf, w, err := testrig.CreateMultipartFormData("file", "../../../../testrig/media/test-jpeg.jpg", map[string]string{ @@ -252,15 +246,13 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessfulV2() { suite.mediaModule.MediaCreatePOSTHandler(ctx) // check what's in storage *after* the request - storageKeysAfterRequest := []string{} - iter, err = suite.storage.KVStore.Iterator(context.Background(), nil) - if err != nil { + var storageKeysAfterRequest []string + if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error { + storageKeysAfterRequest = append(storageKeysAfterRequest, key) + return nil + }); err != nil { panic(err) } - for iter.Next() { - storageKeysAfterRequest = append(storageKeysAfterRequest, iter.Key()) - } - iter.Release() // check response suite.EqualValues(http.StatusOK, recorder.Code) diff --git a/internal/media/manager_test.go b/internal/media/manager_test.go index f4dd8dac7..683f9147a 100644 --- a/internal/media/manager_test.go +++ b/internal/media/manager_test.go @@ -28,7 +28,6 @@ import ( "testing" "time" - "codeberg.org/gruf/go-store/v2/kv" "codeberg.org/gruf/go-store/v2/storage" "github.com/stretchr/testify/suite" gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" @@ -1196,7 +1195,6 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() { defer state.Workers.Stop() storage := >sstorage.Driver{ - KVStore: kv.New(disk), Storage: disk, } state.Storage = storage diff --git a/internal/media/prune.go b/internal/media/prune.go index 975a82e33..9e73fd2a3 100644 --- a/internal/media/prune.go +++ b/internal/media/prune.go @@ -154,55 +154,41 @@ func (m *manager) PruneUnusedRemote(ctx context.Context, dry bool) (int, error) } func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) { - // keys in storage will look like the following: - // `[ACCOUNT_ID]/[MEDIA_TYPE]/[MEDIA_SIZE]/[MEDIA_ID].[EXTENSION]` - // We can filter out keys we're not interested in by - // matching through a regex. - var matchCount int - match := func(storageKey string) bool { - if regexes.FilePath.MatchString(storageKey) { - matchCount++ - return true - } - return false - } - - iterator, err := m.state.Storage.Iterator(ctx, match) // make sure this iterator is always released - if err != nil { - return 0, fmt.Errorf("PruneOrphaned: error getting storage iterator: %w", err) - } - - // Ensure we have some keys, and also advance - // the iterator to the first non-empty key. - if !iterator.Next() { - iterator.Release() - return 0, nil // nothing else to do here - } - - // Emojis are stored under the instance account, - // so we need the ID of the instance account for - // the next part. + // Emojis are stored under the instance account, so we + // need the ID of the instance account for the next part. instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "") if err != nil { - iterator.Release() return 0, fmt.Errorf("PruneOrphaned: error getting instance account: %w", err) } + instanceAccountID := instanceAccount.ID - // For each key in the iterator, check if entry is orphaned. - orphanedKeys := make([]string, 0, matchCount) - for key := iterator.Key(); iterator.Next(); key = iterator.Key() { + var orphanedKeys []string + + // Keys in storage will look like the following format: + // `[ACCOUNT_ID]/[MEDIA_TYPE]/[MEDIA_SIZE]/[MEDIA_ID].[EXTENSION]` + // We can filter out keys we're not interested in by matching through a regex. + if err := m.state.Storage.WalkKeys(ctx, func(ctx context.Context, key string) error { + if !regexes.FilePath.MatchString(key) { + // This is not our expected key format. + return nil + } + + // Check whether this storage entry is orphaned. orphaned, err := m.orphaned(ctx, key, instanceAccountID) if err != nil { - iterator.Release() - return 0, fmt.Errorf("PruneOrphaned: checking orphaned status: %w", err) + return fmt.Errorf("error checking orphaned status: %w", err) } if orphaned { + // Add this orphaned entry to list of keys. orphanedKeys = append(orphanedKeys, key) } + + return nil + }); err != nil { + return 0, fmt.Errorf("PruneOrphaned: error walking keys: %w", err) } - iterator.Release() totalPruned := len(orphanedKeys) @@ -211,9 +197,8 @@ func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) { return totalPruned, nil } - // This is not a drill! - // We have to delete stuff! - return totalPruned, m.removeFiles(ctx, orphanedKeys...) + // This is not a drill! We have to delete stuff! + return m.removeFiles(ctx, orphanedKeys...) } func (m *manager) orphaned(ctx context.Context, key string, instanceAccountID string) (bool, error) { @@ -330,7 +315,7 @@ func (m *manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) { */ func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error { - if err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil { + if _, err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil { return err } @@ -339,7 +324,7 @@ func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.Med } func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error { - if err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil { + if _, err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil { return err } @@ -350,7 +335,7 @@ func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.Me return m.state.DB.UpdateByID(ctx, attachment, attachment.ID, "updated_at", "cached") } -func (m *manager) removeFiles(ctx context.Context, keys ...string) error { +func (m *manager) removeFiles(ctx context.Context, keys ...string) (int, error) { errs := make(gtserror.MultiError, 0, len(keys)) for _, key := range keys { @@ -359,5 +344,5 @@ func (m *manager) removeFiles(ctx context.Context, keys ...string) error { } } - return errs.Combine() + return len(keys) - len(errs), errs.Combine() } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 00dcc088c..d9fc6d930 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -21,6 +21,7 @@ package storage import ( "context" "fmt" + "io" "mime" "net/url" "path" @@ -28,7 +29,6 @@ import ( "codeberg.org/gruf/go-bytesize" "codeberg.org/gruf/go-cache/v3/ttl" - "codeberg.org/gruf/go-store/v2/kv" "codeberg.org/gruf/go-store/v2/storage" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" @@ -54,7 +54,6 @@ var ErrAlreadyExists = storage.ErrAlreadyExists // Driver wraps a kv.KVStore to also provide S3 presigned GET URLs. type Driver struct { // Underlying storage - *kv.KVStore Storage storage.Storage // S3-only parameters @@ -63,6 +62,50 @@ type Driver struct { PresignedCache *ttl.Cache[string, PresignedURL] } +// Get returns the byte value for key in storage. +func (d *Driver) Get(ctx context.Context, key string) ([]byte, error) { + return d.Storage.ReadBytes(ctx, key) +} + +// GetStream returns an io.ReadCloser for the value bytes at key in the storage. +func (d *Driver) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { + return d.Storage.ReadStream(ctx, key) +} + +// Put writes the supplied value bytes at key in the storage +func (d *Driver) Put(ctx context.Context, key string, value []byte) (int, error) { + return d.Storage.WriteBytes(ctx, key, value) +} + +// PutStream writes the bytes from supplied reader at key in the storage +func (d *Driver) PutStream(ctx context.Context, key string, r io.Reader) (int64, error) { + return d.Storage.WriteStream(ctx, key, r) +} + +// Remove attempts to remove the supplied key (and corresponding value) from storage. +func (d *Driver) Delete(ctx context.Context, key string) error { + return d.Storage.Remove(ctx, key) +} + +// Has checks if the supplied key is in the storage. +func (d *Driver) Has(ctx context.Context, key string) (bool, error) { + return d.Storage.Stat(ctx, key) +} + +// WalkKeys walks the keys in the storage. +func (d *Driver) WalkKeys(ctx context.Context, walk func(context.Context, string) error) error { + return d.Storage.WalkKeys(ctx, storage.WalkKeysOptions{ + WalkFn: func(ctx context.Context, entry storage.Entry) error { + return walk(ctx, entry.Key) + }, + }) +} + +// Close will close the storage, releasing any file locks. +func (d *Driver) Close() error { + return d.Storage.Close() +} + // URL will return a presigned GET object URL, but only if running on S3 storage with proxying disabled. func (d *Driver) URL(ctx context.Context, key string) *PresignedURL { // Check whether S3 *without* proxying is enabled @@ -128,7 +171,6 @@ func NewFileStorage() (*Driver, error) { } return &Driver{ - KVStore: kv.New(disk), Storage: disk, }, nil } @@ -163,7 +205,6 @@ func NewS3Storage() (*Driver, error) { presignedCache.Start(urlCacheExpiryFrequency) return &Driver{ - KVStore: kv.New(s3), Proxy: config.GetStorageS3Proxy(), Bucket: config.GetStorageS3BucketName(), Storage: s3, diff --git a/testrig/storage.go b/testrig/storage.go index 5694b3ab6..4bbf02696 100644 --- a/testrig/storage.go +++ b/testrig/storage.go @@ -24,7 +24,6 @@ import ( "os" "path" - "codeberg.org/gruf/go-store/v2/kv" "codeberg.org/gruf/go-store/v2/storage" gtsstorage "github.com/superseriousbusiness/gotosocial/internal/storage" ) @@ -33,7 +32,6 @@ import ( func NewInMemoryStorage() *gtsstorage.Driver { storage := storage.OpenMemory(200, false) return >sstorage.Driver{ - KVStore: kv.New(storage), Storage: storage, } } @@ -95,30 +93,18 @@ func StandardStorageSetup(storage *gtsstorage.Driver, relativePath string) { } } -// StandardStorageTeardown deletes everything in storage so that it's clean for -// the next test -// nolint:gocritic // complains about the type switch, but it's the cleanest solution +// StandardStorageTeardown deletes everything in storage so that it's clean for the next test. func StandardStorageTeardown(storage *gtsstorage.Driver) { defer os.RemoveAll(path.Join(os.TempDir(), "gotosocial")) - // Open a storage iterator - iter, err := storage.Iterator(context.Background(), nil) - if err != nil { - panic(err) - } - var keys []string - for iter.Next() { - // Collate all of the storage keys - keys = append(keys, iter.Key()) - } - - // Done with iter - iter.Release() + _ = storage.WalkKeys(context.Background(), func(ctx context.Context, key string) error { + keys = append(keys, key) + return nil + }) for _, key := range keys { - // Ignore errors, we just want to attempt delete all _ = storage.Delete(context.Background(), key) } }