[chore/performance] simplify storage driver to use storage.Storage directly (#1576)
* simply use storage.Storage, removing wrapping KVStore as we don't need KV store locking functionality Signed-off-by: kim <grufwub@gmail.com> * fix missing unwrapped function Signed-off-by: kim <grufwub@gmail.com> * add code comment Signed-off-by: kim <grufwub@gmail.com> * linter, please take my offering in peace Signed-off-by: kim <grufwub@gmail.com> --------- Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
parent
e4c5f9adfd
commit
87c5c42972
|
@ -136,15 +136,13 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessful() {
|
||||||
ctx.Set(oauth.SessionAuthorizedAccount, suite.testAccounts["local_account_1"])
|
ctx.Set(oauth.SessionAuthorizedAccount, suite.testAccounts["local_account_1"])
|
||||||
|
|
||||||
// see what's in storage *before* the request
|
// see what's in storage *before* the request
|
||||||
storageKeysBeforeRequest := []string{}
|
var storageKeysBeforeRequest []string
|
||||||
iter, err := suite.storage.KVStore.Iterator(context.Background(), nil)
|
if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error {
|
||||||
if err != nil {
|
storageKeysBeforeRequest = append(storageKeysBeforeRequest, key)
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
for iter.Next() {
|
|
||||||
storageKeysBeforeRequest = append(storageKeysBeforeRequest, iter.Key())
|
|
||||||
}
|
|
||||||
iter.Release()
|
|
||||||
|
|
||||||
// create the request
|
// create the request
|
||||||
buf, w, err := testrig.CreateMultipartFormData("file", "../../../../testrig/media/test-jpeg.jpg", map[string]string{
|
buf, w, err := testrig.CreateMultipartFormData("file", "../../../../testrig/media/test-jpeg.jpg", map[string]string{
|
||||||
|
@ -163,15 +161,13 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessful() {
|
||||||
suite.mediaModule.MediaCreatePOSTHandler(ctx)
|
suite.mediaModule.MediaCreatePOSTHandler(ctx)
|
||||||
|
|
||||||
// check what's in storage *after* the request
|
// check what's in storage *after* the request
|
||||||
storageKeysAfterRequest := []string{}
|
var storageKeysAfterRequest []string
|
||||||
iter, err = suite.storage.KVStore.Iterator(context.Background(), nil)
|
if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error {
|
||||||
if err != nil {
|
storageKeysAfterRequest = append(storageKeysAfterRequest, key)
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
for iter.Next() {
|
|
||||||
storageKeysAfterRequest = append(storageKeysAfterRequest, iter.Key())
|
|
||||||
}
|
|
||||||
iter.Release()
|
|
||||||
|
|
||||||
// check response
|
// check response
|
||||||
suite.EqualValues(http.StatusOK, recorder.Code)
|
suite.EqualValues(http.StatusOK, recorder.Code)
|
||||||
|
@ -225,15 +221,13 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessfulV2() {
|
||||||
ctx.Set(oauth.SessionAuthorizedAccount, suite.testAccounts["local_account_1"])
|
ctx.Set(oauth.SessionAuthorizedAccount, suite.testAccounts["local_account_1"])
|
||||||
|
|
||||||
// see what's in storage *before* the request
|
// see what's in storage *before* the request
|
||||||
storageKeysBeforeRequest := []string{}
|
var storageKeysBeforeRequest []string
|
||||||
iter, err := suite.storage.KVStore.Iterator(context.Background(), nil)
|
if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error {
|
||||||
if err != nil {
|
storageKeysBeforeRequest = append(storageKeysBeforeRequest, key)
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
for iter.Next() {
|
|
||||||
storageKeysBeforeRequest = append(storageKeysBeforeRequest, iter.Key())
|
|
||||||
}
|
|
||||||
iter.Release()
|
|
||||||
|
|
||||||
// create the request
|
// create the request
|
||||||
buf, w, err := testrig.CreateMultipartFormData("file", "../../../../testrig/media/test-jpeg.jpg", map[string]string{
|
buf, w, err := testrig.CreateMultipartFormData("file", "../../../../testrig/media/test-jpeg.jpg", map[string]string{
|
||||||
|
@ -252,15 +246,13 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessfulV2() {
|
||||||
suite.mediaModule.MediaCreatePOSTHandler(ctx)
|
suite.mediaModule.MediaCreatePOSTHandler(ctx)
|
||||||
|
|
||||||
// check what's in storage *after* the request
|
// check what's in storage *after* the request
|
||||||
storageKeysAfterRequest := []string{}
|
var storageKeysAfterRequest []string
|
||||||
iter, err = suite.storage.KVStore.Iterator(context.Background(), nil)
|
if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error {
|
||||||
if err != nil {
|
storageKeysAfterRequest = append(storageKeysAfterRequest, key)
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
for iter.Next() {
|
|
||||||
storageKeysAfterRequest = append(storageKeysAfterRequest, iter.Key())
|
|
||||||
}
|
|
||||||
iter.Release()
|
|
||||||
|
|
||||||
// check response
|
// check response
|
||||||
suite.EqualValues(http.StatusOK, recorder.Code)
|
suite.EqualValues(http.StatusOK, recorder.Code)
|
||||||
|
|
|
@ -28,7 +28,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"codeberg.org/gruf/go-store/v2/kv"
|
|
||||||
"codeberg.org/gruf/go-store/v2/storage"
|
"codeberg.org/gruf/go-store/v2/storage"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||||
|
@ -1196,7 +1195,6 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() {
|
||||||
defer state.Workers.Stop()
|
defer state.Workers.Stop()
|
||||||
|
|
||||||
storage := >sstorage.Driver{
|
storage := >sstorage.Driver{
|
||||||
KVStore: kv.New(disk),
|
|
||||||
Storage: disk,
|
Storage: disk,
|
||||||
}
|
}
|
||||||
state.Storage = storage
|
state.Storage = storage
|
||||||
|
|
|
@ -154,55 +154,41 @@ func (m *manager) PruneUnusedRemote(ctx context.Context, dry bool) (int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) {
|
func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) {
|
||||||
// keys in storage will look like the following:
|
// Emojis are stored under the instance account, so we
|
||||||
// `[ACCOUNT_ID]/[MEDIA_TYPE]/[MEDIA_SIZE]/[MEDIA_ID].[EXTENSION]`
|
// need the ID of the instance account for the next part.
|
||||||
// We can filter out keys we're not interested in by
|
|
||||||
// matching through a regex.
|
|
||||||
var matchCount int
|
|
||||||
match := func(storageKey string) bool {
|
|
||||||
if regexes.FilePath.MatchString(storageKey) {
|
|
||||||
matchCount++
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
iterator, err := m.state.Storage.Iterator(ctx, match) // make sure this iterator is always released
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("PruneOrphaned: error getting storage iterator: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure we have some keys, and also advance
|
|
||||||
// the iterator to the first non-empty key.
|
|
||||||
if !iterator.Next() {
|
|
||||||
iterator.Release()
|
|
||||||
return 0, nil // nothing else to do here
|
|
||||||
}
|
|
||||||
|
|
||||||
// Emojis are stored under the instance account,
|
|
||||||
// so we need the ID of the instance account for
|
|
||||||
// the next part.
|
|
||||||
instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "")
|
instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
iterator.Release()
|
|
||||||
return 0, fmt.Errorf("PruneOrphaned: error getting instance account: %w", err)
|
return 0, fmt.Errorf("PruneOrphaned: error getting instance account: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
instanceAccountID := instanceAccount.ID
|
instanceAccountID := instanceAccount.ID
|
||||||
|
|
||||||
// For each key in the iterator, check if entry is orphaned.
|
var orphanedKeys []string
|
||||||
orphanedKeys := make([]string, 0, matchCount)
|
|
||||||
for key := iterator.Key(); iterator.Next(); key = iterator.Key() {
|
// Keys in storage will look like the following format:
|
||||||
|
// `[ACCOUNT_ID]/[MEDIA_TYPE]/[MEDIA_SIZE]/[MEDIA_ID].[EXTENSION]`
|
||||||
|
// We can filter out keys we're not interested in by matching through a regex.
|
||||||
|
if err := m.state.Storage.WalkKeys(ctx, func(ctx context.Context, key string) error {
|
||||||
|
if !regexes.FilePath.MatchString(key) {
|
||||||
|
// This is not our expected key format.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check whether this storage entry is orphaned.
|
||||||
orphaned, err := m.orphaned(ctx, key, instanceAccountID)
|
orphaned, err := m.orphaned(ctx, key, instanceAccountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
iterator.Release()
|
return fmt.Errorf("error checking orphaned status: %w", err)
|
||||||
return 0, fmt.Errorf("PruneOrphaned: checking orphaned status: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if orphaned {
|
if orphaned {
|
||||||
|
// Add this orphaned entry to list of keys.
|
||||||
orphanedKeys = append(orphanedKeys, key)
|
orphanedKeys = append(orphanedKeys, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return 0, fmt.Errorf("PruneOrphaned: error walking keys: %w", err)
|
||||||
}
|
}
|
||||||
iterator.Release()
|
|
||||||
|
|
||||||
totalPruned := len(orphanedKeys)
|
totalPruned := len(orphanedKeys)
|
||||||
|
|
||||||
|
@ -211,9 +197,8 @@ func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) {
|
||||||
return totalPruned, nil
|
return totalPruned, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is not a drill!
|
// This is not a drill! We have to delete stuff!
|
||||||
// We have to delete stuff!
|
return m.removeFiles(ctx, orphanedKeys...)
|
||||||
return totalPruned, m.removeFiles(ctx, orphanedKeys...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) orphaned(ctx context.Context, key string, instanceAccountID string) (bool, error) {
|
func (m *manager) orphaned(ctx context.Context, key string, instanceAccountID string) (bool, error) {
|
||||||
|
@ -330,7 +315,7 @@ func (m *manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
|
func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
|
||||||
if err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil {
|
if _, err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -339,7 +324,7 @@ func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.Med
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
|
func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
|
||||||
if err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil {
|
if _, err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -350,7 +335,7 @@ func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.Me
|
||||||
return m.state.DB.UpdateByID(ctx, attachment, attachment.ID, "updated_at", "cached")
|
return m.state.DB.UpdateByID(ctx, attachment, attachment.ID, "updated_at", "cached")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) removeFiles(ctx context.Context, keys ...string) error {
|
func (m *manager) removeFiles(ctx context.Context, keys ...string) (int, error) {
|
||||||
errs := make(gtserror.MultiError, 0, len(keys))
|
errs := make(gtserror.MultiError, 0, len(keys))
|
||||||
|
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
|
@ -359,5 +344,5 @@ func (m *manager) removeFiles(ctx context.Context, keys ...string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return errs.Combine()
|
return len(keys) - len(errs), errs.Combine()
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package storage
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"mime"
|
"mime"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
@ -28,7 +29,6 @@ import (
|
||||||
|
|
||||||
"codeberg.org/gruf/go-bytesize"
|
"codeberg.org/gruf/go-bytesize"
|
||||||
"codeberg.org/gruf/go-cache/v3/ttl"
|
"codeberg.org/gruf/go-cache/v3/ttl"
|
||||||
"codeberg.org/gruf/go-store/v2/kv"
|
|
||||||
"codeberg.org/gruf/go-store/v2/storage"
|
"codeberg.org/gruf/go-store/v2/storage"
|
||||||
"github.com/minio/minio-go/v7"
|
"github.com/minio/minio-go/v7"
|
||||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||||
|
@ -54,7 +54,6 @@ var ErrAlreadyExists = storage.ErrAlreadyExists
|
||||||
// Driver wraps a kv.KVStore to also provide S3 presigned GET URLs.
|
// Driver wraps a kv.KVStore to also provide S3 presigned GET URLs.
|
||||||
type Driver struct {
|
type Driver struct {
|
||||||
// Underlying storage
|
// Underlying storage
|
||||||
*kv.KVStore
|
|
||||||
Storage storage.Storage
|
Storage storage.Storage
|
||||||
|
|
||||||
// S3-only parameters
|
// S3-only parameters
|
||||||
|
@ -63,6 +62,50 @@ type Driver struct {
|
||||||
PresignedCache *ttl.Cache[string, PresignedURL]
|
PresignedCache *ttl.Cache[string, PresignedURL]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get returns the byte value for key in storage.
|
||||||
|
func (d *Driver) Get(ctx context.Context, key string) ([]byte, error) {
|
||||||
|
return d.Storage.ReadBytes(ctx, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetStream returns an io.ReadCloser for the value bytes at key in the storage.
|
||||||
|
func (d *Driver) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
|
||||||
|
return d.Storage.ReadStream(ctx, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put writes the supplied value bytes at key in the storage
|
||||||
|
func (d *Driver) Put(ctx context.Context, key string, value []byte) (int, error) {
|
||||||
|
return d.Storage.WriteBytes(ctx, key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutStream writes the bytes from supplied reader at key in the storage
|
||||||
|
func (d *Driver) PutStream(ctx context.Context, key string, r io.Reader) (int64, error) {
|
||||||
|
return d.Storage.WriteStream(ctx, key, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove attempts to remove the supplied key (and corresponding value) from storage.
|
||||||
|
func (d *Driver) Delete(ctx context.Context, key string) error {
|
||||||
|
return d.Storage.Remove(ctx, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Has checks if the supplied key is in the storage.
|
||||||
|
func (d *Driver) Has(ctx context.Context, key string) (bool, error) {
|
||||||
|
return d.Storage.Stat(ctx, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WalkKeys walks the keys in the storage.
|
||||||
|
func (d *Driver) WalkKeys(ctx context.Context, walk func(context.Context, string) error) error {
|
||||||
|
return d.Storage.WalkKeys(ctx, storage.WalkKeysOptions{
|
||||||
|
WalkFn: func(ctx context.Context, entry storage.Entry) error {
|
||||||
|
return walk(ctx, entry.Key)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close will close the storage, releasing any file locks.
|
||||||
|
func (d *Driver) Close() error {
|
||||||
|
return d.Storage.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// URL will return a presigned GET object URL, but only if running on S3 storage with proxying disabled.
|
// URL will return a presigned GET object URL, but only if running on S3 storage with proxying disabled.
|
||||||
func (d *Driver) URL(ctx context.Context, key string) *PresignedURL {
|
func (d *Driver) URL(ctx context.Context, key string) *PresignedURL {
|
||||||
// Check whether S3 *without* proxying is enabled
|
// Check whether S3 *without* proxying is enabled
|
||||||
|
@ -128,7 +171,6 @@ func NewFileStorage() (*Driver, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Driver{
|
return &Driver{
|
||||||
KVStore: kv.New(disk),
|
|
||||||
Storage: disk,
|
Storage: disk,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -163,7 +205,6 @@ func NewS3Storage() (*Driver, error) {
|
||||||
presignedCache.Start(urlCacheExpiryFrequency)
|
presignedCache.Start(urlCacheExpiryFrequency)
|
||||||
|
|
||||||
return &Driver{
|
return &Driver{
|
||||||
KVStore: kv.New(s3),
|
|
||||||
Proxy: config.GetStorageS3Proxy(),
|
Proxy: config.GetStorageS3Proxy(),
|
||||||
Bucket: config.GetStorageS3BucketName(),
|
Bucket: config.GetStorageS3BucketName(),
|
||||||
Storage: s3,
|
Storage: s3,
|
||||||
|
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
|
||||||
"codeberg.org/gruf/go-store/v2/kv"
|
|
||||||
"codeberg.org/gruf/go-store/v2/storage"
|
"codeberg.org/gruf/go-store/v2/storage"
|
||||||
gtsstorage "github.com/superseriousbusiness/gotosocial/internal/storage"
|
gtsstorage "github.com/superseriousbusiness/gotosocial/internal/storage"
|
||||||
)
|
)
|
||||||
|
@ -33,7 +32,6 @@ import (
|
||||||
func NewInMemoryStorage() *gtsstorage.Driver {
|
func NewInMemoryStorage() *gtsstorage.Driver {
|
||||||
storage := storage.OpenMemory(200, false)
|
storage := storage.OpenMemory(200, false)
|
||||||
return >sstorage.Driver{
|
return >sstorage.Driver{
|
||||||
KVStore: kv.New(storage),
|
|
||||||
Storage: storage,
|
Storage: storage,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -95,30 +93,18 @@ func StandardStorageSetup(storage *gtsstorage.Driver, relativePath string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StandardStorageTeardown deletes everything in storage so that it's clean for
|
// StandardStorageTeardown deletes everything in storage so that it's clean for the next test.
|
||||||
// the next test
|
|
||||||
// nolint:gocritic // complains about the type switch, but it's the cleanest solution
|
|
||||||
func StandardStorageTeardown(storage *gtsstorage.Driver) {
|
func StandardStorageTeardown(storage *gtsstorage.Driver) {
|
||||||
defer os.RemoveAll(path.Join(os.TempDir(), "gotosocial"))
|
defer os.RemoveAll(path.Join(os.TempDir(), "gotosocial"))
|
||||||
|
|
||||||
// Open a storage iterator
|
|
||||||
iter, err := storage.Iterator(context.Background(), nil)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var keys []string
|
var keys []string
|
||||||
|
|
||||||
for iter.Next() {
|
_ = storage.WalkKeys(context.Background(), func(ctx context.Context, key string) error {
|
||||||
// Collate all of the storage keys
|
keys = append(keys, key)
|
||||||
keys = append(keys, iter.Key())
|
return nil
|
||||||
}
|
})
|
||||||
|
|
||||||
// Done with iter
|
|
||||||
iter.Release()
|
|
||||||
|
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
// Ignore errors, we just want to attempt delete all
|
|
||||||
_ = storage.Delete(context.Background(), key)
|
_ = storage.Delete(context.Background(), key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue