mirror of
1
Fork 0

[chore] tidy up media manager, add calling func to errors, build-script improvements (#1835)

* media manager tidy-up: de-interface and remove unused PostDataFunc

Signed-off-by: kim <grufwub@gmail.com>

* remove last traces of media.Manager being an interface

Signed-off-by: kim <grufwub@gmail.com>

* update error to provide caller, allow tuneable via build tags

Signed-off-by: kim <grufwub@gmail.com>

* remove kim-specific build script changes

Signed-off-by: kim <grufwub@gmail.com>

* fix merge conflicts

Signed-off-by: kim <grufwub@gmail.com>

* update build-script to support externally setting build variables

Signed-off-by: kim <grufwub@gmail.com>

---------

Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2023-05-28 13:08:35 +01:00 committed by tsmethurst
parent 949a1f5438
commit 94792ff7e8
64 changed files with 444 additions and 392 deletions

View File

@ -31,7 +31,7 @@ import (
type prune struct {
dbService db.DB
storage *gtsstorage.Driver
manager media.Manager
manager *media.Manager
state *state.State
}

View File

@ -43,7 +43,7 @@ type EmojiGetTestSuite struct {
suite.Suite
db db.DB
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
emailSender email.Sender
processor *processing.Processor

View File

@ -39,7 +39,7 @@ type UserStandardTestSuite struct {
suite.Suite
db db.DB
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
emailSender email.Sender
processor *processing.Processor

View File

@ -46,7 +46,7 @@ type AuthStandardTestSuite struct {
db db.DB
storage *storage.Driver
state state.State
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
processor *processing.Processor
emailSender email.Sender

View File

@ -44,7 +44,7 @@ type AccountStandardTestSuite struct {
suite.Suite
db db.DB
storage *storage.Driver
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
processor *processing.Processor
emailSender email.Sender

View File

@ -44,7 +44,7 @@ type AdminStandardTestSuite struct {
suite.Suite
db db.DB
storage *storage.Driver
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
processor *processing.Processor
emailSender email.Sender

View File

@ -50,7 +50,7 @@ type BookmarkTestSuite struct {
suite.Suite
db db.DB
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
emailSender email.Sender
processor *processing.Processor

View File

@ -37,7 +37,7 @@ type FavouritesStandardTestSuite struct {
suite.Suite
db db.DB
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
emailSender email.Sender
processor *processing.Processor

View File

@ -42,7 +42,7 @@ type FollowRequestStandardTestSuite struct {
suite.Suite
db db.DB
storage *storage.Driver
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
processor *processing.Processor
emailSender email.Sender

View File

@ -43,7 +43,7 @@ type InstanceStandardTestSuite struct {
suite.Suite
db db.DB
storage *storage.Driver
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
processor *processing.Processor
emailSender email.Sender

View File

@ -52,7 +52,7 @@ type MediaCreateTestSuite struct {
suite.Suite
db db.DB
storage *storage.Driver
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
tc typeutils.TypeConverter
oauthServer oauth.Server

View File

@ -52,7 +52,7 @@ type MediaUpdateTestSuite struct {
storage *storage.Driver
federator federation.Federator
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
oauthServer oauth.Server
emailSender email.Sender
processor *processing.Processor

View File

@ -35,7 +35,7 @@ type ReportsStandardTestSuite struct {
suite.Suite
db db.DB
storage *storage.Driver
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
processor *processing.Processor
emailSender email.Sender

View File

@ -43,7 +43,7 @@ type SearchStandardTestSuite struct {
suite.Suite
db db.DB
storage *storage.Driver
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
processor *processing.Processor
emailSender email.Sender

View File

@ -37,7 +37,7 @@ type StatusStandardTestSuite struct {
suite.Suite
db db.DB
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
emailSender email.Sender
processor *processing.Processor

View File

@ -49,7 +49,7 @@ type StreamingTestSuite struct {
suite.Suite
db db.DB
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
emailSender email.Sender
processor *processing.Processor

View File

@ -36,7 +36,7 @@ type UserStandardTestSuite struct {
suite.Suite
db db.DB
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
emailSender email.Sender
processor *processing.Processor

View File

@ -45,7 +45,7 @@ type FileserverTestSuite struct {
federator federation.Federator
tc typeutils.TypeConverter
processor *processing.Processor
mediaManager media.Manager
mediaManager *media.Manager
oauthServer oauth.Server
emailSender email.Sender

View File

@ -39,7 +39,7 @@ type WebfingerStandardTestSuite struct {
db db.DB
state state.State
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
emailSender email.Sender
processor *processing.Processor

View File

@ -21,7 +21,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"time"
@ -31,6 +30,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
@ -93,14 +93,14 @@ func (d *deref) getAccountByURI(ctx context.Context, requestUser string, uri *ur
// Search the database for existing account with ID URI.
account, err = d.state.DB.GetAccountByURI(ctx, uriStr)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, nil, fmt.Errorf("GetAccountByURI: error checking database for account %s by uri: %w", uriStr, err)
return nil, nil, gtserror.Newf("error checking database for account %s by uri: %w", uriStr, err)
}
if account == nil {
// Else, search the database for existing by ID URL.
account, err = d.state.DB.GetAccountByURL(ctx, uriStr)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, nil, fmt.Errorf("GetAccountByURI: error checking database for account %s by url: %w", uriStr, err)
return nil, nil, gtserror.Newf("error checking database for account %s by url: %w", uriStr, err)
}
}
@ -155,7 +155,7 @@ func (d *deref) GetAccountByUsernameDomain(ctx context.Context, requestUser stri
// Search the database for existing account with USERNAME@DOMAIN.
account, err := d.state.DB.GetAccountByUsernameDomain(ctx, username, domain)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, nil, fmt.Errorf("GetAccountByUsernameDomain: error checking database for account %s@%s: %w", username, domain, err)
return nil, nil, gtserror.Newf("error checking database for account %s@%s: %w", username, domain, err)
}
if account == nil {
@ -209,7 +209,7 @@ func (d *deref) RefreshAccount(ctx context.Context, requestUser string, account
// Parse the URI from account.
uri, err := url.Parse(account.URI)
if err != nil {
return nil, nil, fmt.Errorf("RefreshAccount: invalid account uri %q: %w", account.URI, err)
return nil, nil, gtserror.Newf("invalid account uri %q: %w", account.URI, err)
}
// Try to update + deref existing account model.
@ -249,7 +249,7 @@ func (d *deref) RefreshAccountAsync(ctx context.Context, requestUser string, acc
// Parse the URI from account.
uri, err := url.Parse(account.URI)
if err != nil {
log.Errorf(ctx, "RefreshAccountAsync: invalid account uri %q: %v", account.URI, err)
log.Errorf(ctx, "invalid account uri %q: %v", account.URI, err)
return
}
@ -273,7 +273,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
// Pre-fetch a transport for requesting username, used by later deref procedures.
tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)
if err != nil {
return nil, nil, fmt.Errorf("enrichAccount: couldn't create transport: %w", err)
return nil, nil, gtserror.Newf("couldn't create transport: %w", err)
}
if account.Username != "" {
@ -282,7 +282,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
if err != nil {
if account.URI == "" {
// this is a new account (to us) with username@domain but failed webfinger, nothing more we can do.
return nil, nil, &ErrNotRetrievable{fmt.Errorf("enrichAccount: error webfingering account: %w", err)}
return nil, nil, &ErrNotRetrievable{gtserror.Newf("error webfingering account: %w", err)}
}
// Simply log this error and move on, we already have an account URI.
@ -298,7 +298,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
// After webfinger, we now have correct account domain from which we can do a final DB check.
alreadyAccount, err := d.state.DB.GetAccountByUsernameDomain(ctx, account.Username, accDomain)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, nil, fmt.Errorf("enrichAccount: db err looking for account again after webfinger: %w", err)
return nil, nil, gtserror.Newf("db err looking for account again after webfinger: %w", err)
}
if alreadyAccount != nil {
@ -318,15 +318,15 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
// No URI provided / found, must parse from account.
uri, err = url.Parse(account.URI)
if err != nil {
return nil, nil, fmt.Errorf("enrichAccount: invalid uri %q: %w", account.URI, err)
return nil, nil, gtserror.Newf("invalid uri %q: %w", account.URI, err)
}
}
// Check whether this account URI is a blocked domain / subdomain.
if blocked, err := d.state.DB.IsDomainBlocked(ctx, uri.Host); err != nil {
return nil, nil, fmt.Errorf("enrichAccount: error checking blocked domain: %w", err)
return nil, nil, gtserror.Newf("error checking blocked domain: %w", err)
} else if blocked {
return nil, nil, fmt.Errorf("enrichAccount: %s is blocked", uri.Host)
return nil, nil, gtserror.Newf("%s is blocked", uri.Host)
}
// Mark deref+update handshake start.
@ -341,13 +341,13 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
// Dereference latest version of the account.
b, err := tsport.Dereference(ctx, uri)
if err != nil {
return nil, nil, &ErrNotRetrievable{fmt.Errorf("enrichAccount: error deferencing %s: %w", uri, err)}
return nil, nil, &ErrNotRetrievable{gtserror.Newf("error deferencing %s: %w", uri, err)}
}
// Attempt to resolve ActivityPub account from data.
apubAcc, err = ap.ResolveAccountable(ctx, b)
if err != nil {
return nil, nil, fmt.Errorf("enrichAccount: error resolving accountable from data for account %s: %w", uri, err)
return nil, nil, gtserror.Newf("error resolving accountable from data for account %s: %w", uri, err)
}
// Convert the dereferenced AP account object to our GTS model.
@ -356,7 +356,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
account.Domain,
)
if err != nil {
return nil, nil, fmt.Errorf("enrichAccount: error converting accountable to gts model for account %s: %w", uri, err)
return nil, nil, gtserror.Newf("error converting accountable to gts model for account %s: %w", uri, err)
}
}
@ -375,7 +375,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
// Assume the host from the returned ActivityPub representation.
idProp := apubAcc.GetJSONLDId()
if idProp == nil || !idProp.IsIRI() {
return nil, nil, errors.New("enrichAccount: no id property found on person, or id was not an iri")
return nil, nil, gtserror.New("no id property found on person, or id was not an iri")
}
// Get IRI host value.
@ -471,7 +471,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
}
if err != nil {
return nil, nil, fmt.Errorf("enrichAccount: error putting in database: %w", err)
return nil, nil, gtserror.Newf("error putting in database: %w", err)
}
} else {
// Set time of update from the last-fetched date.
@ -483,7 +483,7 @@ func (d *deref) enrichAccount(ctx context.Context, requestUser string, uri *url.
// This is an existing account, update the model in the database.
if err := d.state.DB.UpdateAccount(ctx, latestAcc); err != nil {
return nil, nil, fmt.Errorf("enrichAccount: error updating database: %w", err)
return nil, nil, gtserror.Newf("error updating database: %w", err)
}
}
@ -513,7 +513,7 @@ func (d *deref) fetchRemoteAccountAvatar(ctx context.Context, tsport transport.T
}
// Create new media processing request from the media manager instance.
processing, err = d.mediaManager.PreProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
processing, err = d.mediaManager.PreProcessMedia(ctx, data, accountID, &media.AdditionalMediaInfo{
Avatar: func() *bool { v := true; return &v }(),
RemoteURL: &avatarURL,
})
@ -566,7 +566,7 @@ func (d *deref) fetchRemoteAccountHeader(ctx context.Context, tsport transport.T
}
// Create new media processing request from the media manager instance.
processing, err = d.mediaManager.PreProcessMedia(ctx, data, nil, accountID, &media.AdditionalMediaInfo{
processing, err = d.mediaManager.PreProcessMedia(ctx, data, accountID, &media.AdditionalMediaInfo{
Header: func() *bool { v := true; return &v }(),
RemoteURL: &headerURL,
})
@ -719,7 +719,7 @@ func (d *deref) dereferenceAccountFeatured(ctx context.Context, requestUser stri
// Pre-fetch a transport for requesting username, used by later deref procedures.
tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)
if err != nil {
return fmt.Errorf("enrichAccount: couldn't create transport: %w", err)
return gtserror.Newf("couldn't create transport: %w", err)
}
b, err := tsport.Dereference(ctx, uri)
@ -729,32 +729,32 @@ func (d *deref) dereferenceAccountFeatured(ctx context.Context, requestUser stri
m := make(map[string]interface{})
if err := json.Unmarshal(b, &m); err != nil {
return fmt.Errorf("error unmarshalling bytes into json: %w", err)
return gtserror.Newf("error unmarshalling bytes into json: %w", err)
}
t, err := streams.ToType(ctx, m)
if err != nil {
return fmt.Errorf("error resolving json into ap vocab type: %w", err)
return gtserror.Newf("error resolving json into ap vocab type: %w", err)
}
if t.GetTypeName() != ap.ObjectOrderedCollection {
return fmt.Errorf("%s was not an OrderedCollection", uri)
return gtserror.Newf("%s was not an OrderedCollection", uri)
}
collection, ok := t.(vocab.ActivityStreamsOrderedCollection)
if !ok {
return errors.New("couldn't coerce OrderedCollection")
return gtserror.New("couldn't coerce OrderedCollection")
}
items := collection.GetActivityStreamsOrderedItems()
if items == nil {
return errors.New("nil orderedItems")
return gtserror.New("nil orderedItems")
}
// Get previous pinned statuses (we'll need these later).
wasPinned, err := d.state.DB.GetAccountPinnedStatuses(ctx, account.ID)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return fmt.Errorf("error getting account pinned statuses: %w", err)
return gtserror.Newf("error getting account pinned statuses: %w", err)
}
statusURIs := make([]*url.URL, 0, items.Len())

View File

@ -81,7 +81,7 @@ type deref struct {
state *state.State
typeConverter typeutils.TypeConverter
transportController transport.Controller
mediaManager media.Manager
mediaManager *media.Manager
derefAvatars map[string]*media.ProcessingMedia
derefAvatarsMu mutexes.Mutex
derefHeaders map[string]*media.ProcessingMedia
@ -93,7 +93,7 @@ type deref struct {
}
// NewDereferencer returns a Dereferencer initialized with the given parameters.
func NewDereferencer(state *state.State, typeConverter typeutils.TypeConverter, transportController transport.Controller, mediaManager media.Manager) Dereferencer {
func NewDereferencer(state *state.State, typeConverter typeutils.TypeConverter, transportController transport.Controller, mediaManager *media.Manager) Dereferencer {
return &deref{
state: state,
typeConverter: typeConverter,

View File

@ -60,7 +60,7 @@ func (d *deref) GetRemoteEmoji(ctx context.Context, requestingUsername string, r
return t.DereferenceMedia(innerCtx, derefURI)
}
newProcessing, err := d.mediaManager.PreProcessEmoji(ctx, dataFunc, nil, shortcode, id, emojiURI, ai, refresh)
newProcessing, err := d.mediaManager.PreProcessEmoji(ctx, dataFunc, shortcode, id, emojiURI, ai, refresh)
if err != nil {
return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err)
}

View File

@ -45,7 +45,7 @@ func (d *deref) GetRemoteMedia(ctx context.Context, requestingUsername string, a
return t.DereferenceMedia(innerCtx, derefURI)
}
processingMedia, err := d.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai)
processingMedia, err := d.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai)
if err != nil {
return nil, fmt.Errorf("GetRemoteMedia: error processing attachment: %s", err)
}

View File

@ -20,7 +20,6 @@ package dereferencing
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"time"
@ -28,6 +27,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
@ -83,14 +83,14 @@ func (d *deref) getStatusByURI(ctx context.Context, requestUser string, uri *url
// Search the database for existing status with ID URI.
status, err = d.state.DB.GetStatusByURI(ctx, uriStr)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, nil, fmt.Errorf("GetStatusByURI: error checking database for status %s by uri: %w", uriStr, err)
return nil, nil, gtserror.Newf("error checking database for status %s by uri: %w", uriStr, err)
}
if status == nil {
// Else, search the database for existing by ID URL.
status, err = d.state.DB.GetStatusByURL(ctx, uriStr)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
return nil, nil, fmt.Errorf("GetStatusByURI: error checking database for status %s by url: %w", uriStr, err)
return nil, nil, gtserror.Newf("error checking database for status %s by url: %w", uriStr, err)
}
}
@ -138,7 +138,7 @@ func (d *deref) RefreshStatus(ctx context.Context, requestUser string, status *g
// Parse the URI from status.
uri, err := url.Parse(status.URI)
if err != nil {
return nil, nil, fmt.Errorf("RefreshStatus: invalid status uri %q: %w", status.URI, err)
return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err)
}
// Try to update + deref existing status model.
@ -170,7 +170,7 @@ func (d *deref) RefreshStatusAsync(ctx context.Context, requestUser string, stat
// Parse the URI from status.
uri, err := url.Parse(status.URI)
if err != nil {
log.Errorf(ctx, "RefreshStatusAsync: invalid status uri %q: %v", status.URI, err)
log.Errorf(ctx, "invalid status uri %q: %v", status.URI, err)
return
}
@ -192,14 +192,14 @@ func (d *deref) enrichStatus(ctx context.Context, requestUser string, uri *url.U
// Pre-fetch a transport for requesting username, used by later dereferencing.
tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)
if err != nil {
return nil, nil, fmt.Errorf("enrichStatus: couldn't create transport: %w", err)
return nil, nil, gtserror.Newf("couldn't create transport: %w", err)
}
// Check whether this account URI is a blocked domain / subdomain.
if blocked, err := d.state.DB.IsDomainBlocked(ctx, uri.Host); err != nil {
return nil, nil, fmt.Errorf("enrichStatus: error checking blocked domain: %w", err)
return nil, nil, gtserror.Newf("error checking blocked domain: %w", err)
} else if blocked {
return nil, nil, fmt.Errorf("enrichStatus: %s is blocked", uri.Host)
return nil, nil, gtserror.Newf("%s is blocked", uri.Host)
}
var derefd bool
@ -208,13 +208,13 @@ func (d *deref) enrichStatus(ctx context.Context, requestUser string, uri *url.U
// Dereference latest version of the status.
b, err := tsport.Dereference(ctx, uri)
if err != nil {
return nil, nil, &ErrNotRetrievable{fmt.Errorf("enrichStatus: error deferencing %s: %w", uri, err)}
return nil, nil, &ErrNotRetrievable{gtserror.Newf("error deferencing %s: %w", uri, err)}
}
// Attempt to resolve ActivityPub status from data.
apubStatus, err = ap.ResolveStatusable(ctx, b)
if err != nil {
return nil, nil, fmt.Errorf("enrichStatus: error resolving statusable from data for account %s: %w", uri, err)
return nil, nil, gtserror.Newf("error resolving statusable from data for account %s: %w", uri, err)
}
// Mark as deref'd.
@ -224,14 +224,14 @@ func (d *deref) enrichStatus(ctx context.Context, requestUser string, uri *url.U
// Get the attributed-to status in order to fetch profile.
attributedTo, err := ap.ExtractAttributedTo(apubStatus)
if err != nil {
return nil, nil, errors.New("enrichStatus: attributedTo was empty")
return nil, nil, gtserror.New("attributedTo was empty")
}
// Ensure we have the author account of the status dereferenced (+ up-to-date).
if author, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil {
if status.AccountID == "" {
// Provided status account is nil, i.e. this is a new status / author, so a deref fail is unrecoverable.
return nil, nil, fmt.Errorf("enrichStatus: failed to dereference status author %s: %w", uri, err)
return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err)
}
} else if status.AccountID != "" && status.AccountID != author.ID {
// There already existed an account for this status author, but account ID changed. This shouldn't happen!
@ -247,7 +247,7 @@ func (d *deref) enrichStatus(ctx context.Context, requestUser string, uri *url.U
// may contain out-of-date information, convert AP model to our GTS model.
latestStatus, err = d.typeConverter.ASStatusToStatus(ctx, apubStatus)
if err != nil {
return nil, nil, fmt.Errorf("enrichStatus: error converting statusable to gts model for status %s: %w", uri, err)
return nil, nil, gtserror.Newf("error converting statusable to gts model for status %s: %w", uri, err)
}
}
@ -258,7 +258,7 @@ func (d *deref) enrichStatus(ctx context.Context, requestUser string, uri *url.U
// Generate new status ID from the provided creation date.
latestStatus.ID, err = id.NewULIDFromTime(latestStatus.CreatedAt)
if err != nil {
return nil, nil, fmt.Errorf("enrichStatus: invalid created at date: %w", err)
return nil, nil, gtserror.Newf("invalid created at date: %w", err)
}
}
@ -268,19 +268,19 @@ func (d *deref) enrichStatus(ctx context.Context, requestUser string, uri *url.U
// Ensure the status' mentions are populated, and pass in existing to check for changes.
if err := d.fetchStatusMentions(ctx, requestUser, status, latestStatus); err != nil {
return nil, nil, fmt.Errorf("enrichStatus: error populating mentions for status %s: %w", uri, err)
return nil, nil, gtserror.Newf("error populating mentions for status %s: %w", uri, err)
}
// TODO: populateStatusTags()
// Ensure the status' media attachments are populated, passing in existing to check for changes.
if err := d.fetchStatusAttachments(ctx, tsport, status, latestStatus); err != nil {
return nil, nil, fmt.Errorf("enrichStatus: error populating attachments for status %s: %w", uri, err)
return nil, nil, gtserror.Newf("error populating attachments for status %s: %w", uri, err)
}
// Ensure the status' emoji attachments are populated, passing in existing to check for changes.
if err := d.fetchStatusEmojis(ctx, requestUser, status, latestStatus); err != nil {
return nil, nil, fmt.Errorf("enrichStatus: error populating emojis for status %s: %w", uri, err)
return nil, nil, gtserror.Newf("error populating emojis for status %s: %w", uri, err)
}
if status.CreatedAt.IsZero() {
@ -297,12 +297,12 @@ func (d *deref) enrichStatus(ctx context.Context, requestUser string, uri *url.U
}
if err != nil {
return nil, nil, fmt.Errorf("enrichStatus: error putting in database: %w", err)
return nil, nil, gtserror.Newf("error putting in database: %w", err)
}
} else {
// This is an existing status, update the model in the database.
if err := d.state.DB.UpdateStatus(ctx, latestStatus); err != nil {
return nil, nil, fmt.Errorf("enrichStatus: error updating database: %w", err)
return nil, nil, gtserror.Newf("error updating database: %w", err)
}
}
@ -359,7 +359,7 @@ func (d *deref) fetchStatusMentions(ctx context.Context, requestUser string, exi
// Place the new mention into the database.
if err := d.state.DB.PutMention(ctx, mention); err != nil {
return fmt.Errorf("error putting mention in database: %w", err)
return gtserror.Newf("error putting mention in database: %w", err)
}
// Set the *new* mention and ID.
@ -406,7 +406,7 @@ func (d *deref) fetchStatusAttachments(ctx context.Context, tsport transport.Tra
// Start pre-processing remote media at remote URL.
processing, err := d.mediaManager.PreProcessMedia(ctx, func(ctx context.Context) (io.ReadCloser, int64, error) {
return tsport.DereferenceMedia(ctx, remoteURL)
}, nil, status.AccountID, &media.AdditionalMediaInfo{
}, status.AccountID, &media.AdditionalMediaInfo{
StatusID: &status.ID,
RemoteURL: &placeholder.RemoteURL,
Description: &placeholder.Description,
@ -447,7 +447,7 @@ func (d *deref) fetchStatusEmojis(ctx context.Context, requestUser string, exist
// Fetch the full-fleshed-out emoji objects for our status.
emojis, err := d.populateEmojis(ctx, status.Emojis, requestUser)
if err != nil {
return fmt.Errorf("failed to populate emojis: %w", err)
return gtserror.Newf("failed to populate emojis: %w", err)
}
// Iterate over and get their IDs.

View File

@ -19,13 +19,13 @@ package dereferencing
import (
"context"
"fmt"
"net/url"
"codeberg.org/gruf/go-kv"
"github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/uris"
@ -39,12 +39,12 @@ const maxIter = 1000
func (d *deref) dereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) {
// Ensure that ancestors have been fully dereferenced
if err := d.dereferenceStatusAncestors(ctx, username, status); err != nil {
log.Errorf(ctx, "error dereferencing status ancestors: %v", err)
log.Error(ctx, err) // log entry and error will include caller prefixes
}
// Ensure that descendants have been fully dereferenced
if err := d.dereferenceStatusDescendants(ctx, username, statusIRI, statusable); err != nil {
log.Errorf(ctx, "error dereferencing status descendants: %v", err)
log.Error(ctx, err) // log entry and error will include caller prefixes
}
}
@ -72,7 +72,7 @@ func (d *deref) dereferenceStatusAncestors(ctx context.Context, username string,
// Parse this status's replied IRI
replyIRI, err := url.Parse(status.InReplyToURI)
if err != nil {
return fmt.Errorf("invalid status InReplyToURI %q: %w", status.InReplyToURI, err)
return gtserror.Newf("invalid status InReplyToURI %q: %w", status.InReplyToURI, err)
}
if replyIRI.Host == config.GetHost() {
@ -81,13 +81,13 @@ func (d *deref) dereferenceStatusAncestors(ctx context.Context, username string,
// This is our status, extract ID from path
_, id, err := uris.ParseStatusesPath(replyIRI)
if err != nil {
return fmt.Errorf("invalid local status IRI %q: %w", status.InReplyToURI, err)
return gtserror.Newf("invalid local status IRI %q: %w", status.InReplyToURI, err)
}
// Fetch this status from the database
localStatus, err := d.state.DB.GetStatusByID(ctx, id)
if err != nil {
return fmt.Errorf("error fetching local status %q: %w", id, err)
return gtserror.Newf("error fetching local status %q: %w", id, err)
}
// Set the fetched status
@ -102,7 +102,7 @@ func (d *deref) dereferenceStatusAncestors(ctx context.Context, username string,
replyIRI,
)
if err != nil {
return fmt.Errorf("error fetching remote status %q: %w", status.InReplyToURI, err)
return gtserror.Newf("error fetching remote status %q: %w", status.InReplyToURI, err)
}
// Set the fetched status
@ -110,7 +110,7 @@ func (d *deref) dereferenceStatusAncestors(ctx context.Context, username string,
}
}
return fmt.Errorf("reached %d ancestor iterations for %q", maxIter, ogIRI)
return gtserror.Newf("reached %d ancestor iterations for %q", maxIter, ogIRI)
}
func (d *deref) dereferenceStatusDescendants(ctx context.Context, username string, statusIRI *url.URL, parent ap.Statusable) error {
@ -312,5 +312,5 @@ stackLoop:
}
}
return fmt.Errorf("reached %d descendant iterations for %q", maxIter, ogIRI.String())
return gtserror.Newf("reached %d descendant iterations for %q", maxIter, ogIRI.String())
}

View File

@ -62,13 +62,13 @@ type federator struct {
clock pub.Clock
typeConverter typeutils.TypeConverter
transportController transport.Controller
mediaManager media.Manager
mediaManager *media.Manager
actor pub.FederatingActor
dereferencing.Dereferencer
}
// NewFederator returns a new federator
func NewFederator(state *state.State, federatingDB federatingdb.DB, transportController transport.Controller, typeConverter typeutils.TypeConverter, mediaManager media.Manager) Federator {
func NewFederator(state *state.State, federatingDB federatingdb.DB, transportController transport.Controller, typeConverter typeutils.TypeConverter, mediaManager *media.Manager) Federator {
dereferencer := dereferencing.NewDereferencer(state, typeConverter, transportController, mediaManager)
clock := &Clock{}

View File

@ -41,7 +41,7 @@ type Server interface {
// NewServer returns a new gotosocial server, initialized with the given configuration.
// An error will be returned the caller if something goes wrong during initialization
// eg., no db or storage connection, port for router already in use, etc.
func NewServer(db db.DB, apiRouter router.Router, federator federation.Federator, mediaManager media.Manager) (Server, error) {
func NewServer(db db.DB, apiRouter router.Router, federator federation.Federator, mediaManager *media.Manager) (Server, error) {
return &gotosocial{
db: db,
apiRouter: apiRouter,
@ -55,7 +55,7 @@ type gotosocial struct {
db db.DB
apiRouter router.Router
federator federation.Federator
mediaManager media.Manager
mediaManager *media.Manager
}
// Start starts up the gotosocial server. If something goes wrong

View File

@ -18,48 +18,38 @@
package gtserror
import (
"errors"
"net/http"
"codeberg.org/gruf/go-byteutil"
)
// New returns a new error, prepended with caller function name if gtserror.Caller is enabled.
func New(msg string) error {
return newAt(3, msg)
}
// Newf returns a new formatted error, prepended with caller function name if gtserror.Caller is enabled.
func Newf(msgf string, args ...any) error {
return newfAt(3, msgf, args...)
}
// NewResponseError crafts an error from provided HTTP response
// including the method, status and body (if any provided). This
// will also wrap the returned error using WithStatusCode().
func NewResponseError(rsp *http.Response) error {
var buf byteutil.Buffer
// Get URL string ahead of time.
urlStr := rsp.Request.URL.String()
// Alloc guesstimate of required buf size.
buf.Guarantee(0 +
len(rsp.Request.Method) +
12 + // request to
len(urlStr) +
17 + // failed: status="
len(rsp.Status) +
8 + // " body="
256 + // max body size
1, // "
)
// Build error message string without
// will also wrap the returned error using WithStatusCode() and
// will include the caller function name as a prefix.
func NewFromResponse(rsp *http.Response) error {
// Build error with message without
// using "fmt", as chances are this will
// be used in a hot code path and we
// know all the incoming types involved.
_, _ = buf.WriteString(rsp.Request.Method)
_, _ = buf.WriteString(" request to ")
_, _ = buf.WriteString(urlStr)
_, _ = buf.WriteString(" failed: status=\"")
_, _ = buf.WriteString(rsp.Status)
_, _ = buf.WriteString("\" body=\"")
_, _ = buf.WriteString(drainBody(rsp.Body, 256))
_, _ = buf.WriteString("\"")
// Create new error from msg.
err := errors.New(buf.String())
err := newAt(3, ""+
rsp.Request.Method+
" request to "+
rsp.Request.URL.String()+
" failed: status=\""+
rsp.Status+
"\" body=\""+
drainBody(rsp.Body, 256)+
"\"",
)
// Wrap error to provide status code.
return WithStatusCode(err, rsp.StatusCode)

View File

@ -0,0 +1,92 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//go:build !noerrcaller
package gtserror
import (
"errors"
"fmt"
"runtime"
"strings"
)
// Caller returns whether created errors will prepend calling function name.
const Caller = true
// cerror wraps an error with a string
// prefix of the caller function name.
type cerror struct {
c string
e error
}
func (ce *cerror) Error() string {
msg := ce.e.Error()
return ce.c + ": " + msg
}
func (ce *cerror) Unwrap() error {
return ce.e
}
// newAt is the same as New() but allows specifying calldepth.
func newAt(calldepth int, msg string) error {
return &cerror{
c: caller(calldepth + 1),
e: errors.New(msg),
}
}
// newfAt is the same as Newf() but allows specifying calldepth.
func newfAt(calldepth int, msgf string, args ...any) error {
return &cerror{
c: caller(calldepth + 1),
e: fmt.Errorf(msgf, args...),
}
}
// caller fetches the calling function name, skipping 'depth'. Results are cached per PC.
func caller(depth int) string {
var pcs [1]uintptr
// Fetch calling function using calldepth
_ = runtime.Callers(depth, pcs[:])
fn := runtime.FuncForPC(pcs[0])
if fn == nil {
return ""
}
// Get func name.
name := fn.Name()
// Drop everything but but function name itself
if idx := strings.LastIndexByte(name, '.'); idx >= 0 {
name = name[idx+1:]
}
const params = `[...]`
// Drop any generic type parameter markers
if idx := strings.Index(name, params); idx >= 0 {
name = name[:idx] + name[idx+len(params):]
}
return name
}

View File

@ -0,0 +1,38 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//go:build noerrcaller
package gtserror
import (
"errors"
"fmt"
)
// Caller returns whether created errors will prepend calling function name.
const Caller = false
// newAt is the same as New() but allows specifying calldepth.
func newAt(_ int, msg string) error {
return errors.New(msg)
}
// newfAt is the same as Newf() but allows specifying calldepth.
func newfAt(_ int, msgf string, args ...any) error {
return fmt.Errorf(msgf, args...)
}

View File

@ -10,6 +10,7 @@ import (
"testing"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
func TestResponseError(t *testing.T) {
@ -53,13 +54,19 @@ func testResponseError(t *testing.T, rsp http.Response) {
body = string(b[:trunc])
}
expect := fmt.Sprintf(
"%s request to %s failed: status=\"%s\" body=\"%s\"",
"%s%s request to %s failed: status=\"%s\" body=\"%s\"",
func() string {
if gtserror.Caller {
return strings.Split(log.Caller(3), ".")[1] + ": "
}
return ""
}(),
rsp.Request.Method,
rsp.Request.URL.String(),
rsp.Status,
body,
)
err := gtserror.NewResponseError(&rsp)
err := gtserror.NewFromResponse(&rsp)
if str := err.Error(); str != expect {
t.Errorf("unexpected error string: recv=%q expct=%q", str, expect)
}

View File

@ -34,12 +34,7 @@ func Caller(depth int) string {
return ""
}
// return formatted name
return callername(fn)
}
// callername generates a human-readable calling function name.
func callername(fn *runtime.Func) string {
// Get func name.
name := fn.Name()
// Drop all but the package name and function name, no mod path

View File

@ -21,8 +21,10 @@ import (
"context"
"errors"
"fmt"
"io"
"time"
"codeberg.org/gruf/go-iotools"
"codeberg.org/gruf/go-runners"
"codeberg.org/gruf/go-sched"
"codeberg.org/gruf/go-store/v2/storage"
@ -47,112 +49,7 @@ var SupportedEmojiMIMETypes = []string{
mimeImagePng,
}
// Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs.
type Manager interface {
/*
PROCESSING FUNCTIONS
*/
// PreProcessMedia begins the process of decoding and storing the given data as an attachment.
// It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the media data.
//
// postData will be called after data has been called; it can be used to clean up any remaining resources.
// The provided function can be nil, in which case it will not be executed.
//
// accountID should be the account that the media belongs to.
//
// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
//
// Note: unlike ProcessMedia, this will NOT queue the media to be asynchronously processed.
PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
// PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
//
// Note: unlike ProcessMedia, this will NOT queue the media to be asychronously processed.
PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error)
// ProcessMedia will call PreProcessMedia, followed by queuing the media to be processing in the media worker queue.
ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
// PreProcessEmoji begins the process of decoding and storing the given data as an emoji.
// It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the emoji data.
//
// postData will be called after data has been called; it can be used to clean up any remaining resources.
// The provided function can be nil, in which case it will not be executed.
//
// shortcode should be the emoji shortcode without the ':'s around it.
//
// id is the database ID that should be used to store the emoji.
//
// uri is the ActivityPub URI/ID of the emoji.
//
// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
//
// Note: unlike ProcessEmoji, this will NOT queue the emoji to be asynchronously processed.
PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error)
// ProcessEmoji will call PreProcessEmoji, followed by queuing the emoji to be processing in the emoji worker queue.
ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error)
/*
PRUNING/UNCACHING FUNCTIONS
*/
// PruneAll runs all of the below pruning/uncacheing functions, and then cleans up any resulting
// empty directories from the storage driver. It can be called as a shortcut for calling the below
// pruning functions one by one.
//
// If blocking is true, then any errors encountered during the prune will be combined + returned to
// the caller. If blocking is false, the prune is run in the background and errors are just logged
// instead.
PruneAll(ctx context.Context, mediaCacheRemoteDays int, blocking bool) error
// UncacheRemote uncaches all remote media attachments older than the given amount of days.
//
// In this context, uncacheing means deleting media files from storage and marking the attachment
// as cached=false in the database.
//
// If 'dry' is true, then only a dry run will be performed: nothing will actually be changed.
//
// The returned int is the amount of media that was/would be uncached by this function.
UncacheRemote(ctx context.Context, olderThanDays int, dry bool) (int, error)
// PruneUnusedRemote prunes unused/out of date headers and avatars cached on this instance.
//
// The returned int is the amount of media that was pruned by this function.
PruneUnusedRemote(ctx context.Context, dry bool) (int, error)
// PruneUnusedLocal prunes unused media attachments that were uploaded by
// a user on this instance, but never actually attached to a status, or attached but
// later detached.
//
// The returned int is the amount of media that was pruned by this function.
PruneUnusedLocal(ctx context.Context, dry bool) (int, error)
// PruneOrphaned prunes files that exist in storage but which do not have a corresponding
// entry in the database.
//
// If dry is true, then nothing will be changed, only the amount that *would* be removed
// is returned to the caller.
PruneOrphaned(ctx context.Context, dry bool) (int, error)
/*
REFETCHING FUNCTIONS
Useful when data loss has occurred.
*/
// RefetchEmojis iterates through remote emojis (for the given domain, or all if domain is empty string).
//
// For each emoji, the manager will check whether both the full size and static images are present in storage.
// If not, the manager will refetch and reprocess full size and static images for the emoji.
//
// The provided DereferenceMedia function will be used when it's necessary to refetch something this way.
RefetchEmojis(ctx context.Context, domain string, dereferenceMedia DereferenceMedia) (int, error)
}
type manager struct {
type Manager struct {
state *state.State
}
@ -162,13 +59,24 @@ type manager struct {
// a limited number of media will be processed in parallel. The numbers of workers
// is determined from the $GOMAXPROCS environment variable (usually no. CPU cores).
// See internal/concurrency.NewWorkerPool() documentation for further information.
func NewManager(state *state.State) Manager {
m := &manager{state: state}
func NewManager(state *state.State) *Manager {
m := &Manager{state: state}
scheduleCleanupJobs(m)
return m
}
func (m *manager) PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
// PreProcessMedia begins the process of decoding and storing the given data as an attachment.
// It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the media data.
//
// accountID should be the account that the media belongs to.
//
// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
//
// Note: unlike ProcessMedia, this will NOT queue the media to be asynchronously processed.
func (m *Manager) PreProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
id, err := id.NewRandomULID()
if err != nil {
return nil, err
@ -248,14 +156,16 @@ func (m *manager) PreProcessMedia(ctx context.Context, data DataFunc, postData P
processingMedia := &ProcessingMedia{
media: attachment,
dataFn: data,
postFn: postData,
mgr: m,
}
return processingMedia, nil
}
func (m *manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) {
// PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
//
// Note: unlike ProcessMedia, this will NOT queue the media to be asychronously processed.
func (m *Manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, attachmentID string) (*ProcessingMedia, error) {
// get the existing attachment from database.
attachment, err := m.state.DB.GetAttachmentByID(ctx, attachmentID)
if err != nil {
@ -265,7 +175,6 @@ func (m *manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, pos
processingMedia := &ProcessingMedia{
media: attachment,
dataFn: data,
postFn: postData,
recache: true, // indicate it's a recache
mgr: m,
}
@ -273,9 +182,10 @@ func (m *manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, pos
return processingMedia, nil
}
func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
// ProcessMedia will call PreProcessMedia, followed by queuing the media to be processing in the media worker queue.
func (m *Manager) ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
// Create a new processing media object for this media request.
media, err := m.PreProcessMedia(ctx, data, postData, accountID, ai)
media, err := m.PreProcessMedia(ctx, data, accountID, ai)
if err != nil {
return nil, err
}
@ -286,7 +196,22 @@ func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData Post
return media, nil
}
func (m *manager) PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
// PreProcessEmoji begins the process of decoding and storing the given data as an emoji.
// It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the emoji data.
//
// shortcode should be the emoji shortcode without the ':'s around it.
//
// id is the database ID that should be used to store the emoji.
//
// uri is the ActivityPub URI/ID of the emoji.
//
// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
//
// Note: unlike ProcessEmoji, this will NOT queue the emoji to be asynchronously processed.
func (m *Manager) PreProcessEmoji(ctx context.Context, data DataFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "")
if err != nil {
return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err)
@ -299,36 +224,38 @@ func (m *manager) PreProcessEmoji(ctx context.Context, data DataFunc, postData P
)
if refresh {
// Look for existing emoji by given ID.
emoji, err = m.state.DB.GetEmojiByID(ctx, emojiID)
if err != nil {
return nil, fmt.Errorf("preProcessEmoji: error fetching emoji to refresh from the db: %s", err)
}
// if this is a refresh, we will end up with new images
// stored for this emoji, so we can use the postData function
// stored for this emoji, so we can use an io.Closer callback
// to perform clean up of the old images from storage
originalPostData := postData
originalData := data
originalImagePath := emoji.ImagePath
originalImageStaticPath := emoji.ImageStaticPath
postData = func(innerCtx context.Context) error {
// trigger the original postData function if it was provided
if originalPostData != nil {
if err := originalPostData(innerCtx); err != nil {
return err
data = func(ctx context.Context) (io.ReadCloser, int64, error) {
// Call original data func.
rc, sz, err := originalData(ctx)
if err != nil {
return nil, 0, err
}
// Wrap closer to cleanup old data.
c := iotools.CloserCallback(rc, func() {
if err := m.state.Storage.Delete(ctx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) {
log.Errorf(ctx, "error removing old emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err)
}
}
l := log.WithContext(ctx).
WithField("shortcode@domain", emoji.Shortcode+"@"+emoji.Domain)
l.Debug("postData: cleaning up old emoji files for refreshed emoji")
if err := m.state.Storage.Delete(innerCtx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) {
l.Errorf("postData: error cleaning up old emoji image at %s for refreshed emoji: %s", originalImagePath, err)
}
if err := m.state.Storage.Delete(innerCtx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) {
l.Errorf("postData: error cleaning up old emoji static image at %s for refreshed emoji: %s", originalImageStaticPath, err)
}
if err := m.state.Storage.Delete(ctx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) {
log.Errorf(ctx, "error removing old static emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err)
}
})
return nil
// Return newly wrapped readcloser and size.
return iotools.ReadCloser(rc, c), sz, nil
}
newPathID, err = id.NewRandomULID()
@ -410,16 +337,16 @@ func (m *manager) PreProcessEmoji(ctx context.Context, data DataFunc, postData P
refresh: refresh,
newPathID: newPathID,
dataFn: data,
postFn: postData,
mgr: m,
}
return processingEmoji, nil
}
func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
// ProcessEmoji will call PreProcessEmoji, followed by queuing the emoji to be processing in the emoji worker queue.
func (m *Manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
// Create a new processing emoji object for this emoji request.
emoji, err := m.PreProcessEmoji(ctx, data, postData, shortcode, id, uri, ai, refresh)
emoji, err := m.PreProcessEmoji(ctx, data, shortcode, id, uri, ai, refresh)
if err != nil {
return nil, err
}
@ -430,7 +357,7 @@ func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData Post
return emoji, nil
}
func scheduleCleanupJobs(m *manager) {
func scheduleCleanupJobs(m *Manager) {
const day = time.Hour * 24
// Calculate closest midnight.

View File

@ -54,7 +54,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlocking() {
emojiID := "01GDQ9G782X42BAMFASKP64343"
emojiURI := "http://localhost:8080/emoji/01GDQ9G782X42BAMFASKP64343"
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "rainbow_test", emojiID, emojiURI, nil, false)
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "rainbow_test", emojiID, emojiURI, nil, false)
suite.NoError(err)
// do a blocking call to fetch the emoji
@ -124,7 +124,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingRefresh() {
emojiID := emojiToUpdate.ID
emojiURI := emojiToUpdate.URI
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "yell", emojiID, emojiURI, &media.AdditionalEmojiInfo{
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "yell", emojiID, emojiURI, &media.AdditionalEmojiInfo{
CreatedAt: &emojiToUpdate.CreatedAt,
Domain: &emojiToUpdate.Domain,
ImageRemoteURL: &newImageRemoteURL,
@ -208,7 +208,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingTooLarge() {
emojiID := "01GDQ9G782X42BAMFASKP64343"
emojiURI := "http://localhost:8080/emoji/01GDQ9G782X42BAMFASKP64343"
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "big_panda", emojiID, emojiURI, nil, false)
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "big_panda", emojiID, emojiURI, nil, false)
suite.NoError(err)
// do a blocking call to fetch the emoji
@ -232,7 +232,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingTooLargeNoSizeGiven() {
emojiID := "01GDQ9G782X42BAMFASKP64343"
emojiURI := "http://localhost:8080/emoji/01GDQ9G782X42BAMFASKP64343"
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "big_panda", emojiID, emojiURI, nil, false)
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "big_panda", emojiID, emojiURI, nil, false)
suite.NoError(err)
// do a blocking call to fetch the emoji
@ -257,7 +257,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingNoFileSizeGiven() {
emojiURI := "http://localhost:8080/emoji/01GDQ9G782X42BAMFASKP64343"
// process the media with no additional info provided
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "rainbow_test", emojiID, emojiURI, nil, false)
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "rainbow_test", emojiID, emojiURI, nil, false)
suite.NoError(err)
// do a blocking call to fetch the emoji
@ -318,7 +318,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -390,7 +390,7 @@ func (suite *ManagerTestSuite) TestSlothVineProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -469,7 +469,7 @@ func (suite *ManagerTestSuite) TestLongerMp4ProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -545,7 +545,7 @@ func (suite *ManagerTestSuite) TestBirdnestMp4ProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -623,7 +623,7 @@ func (suite *ManagerTestSuite) TestNotAnMp4ProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// pre processing should go fine but...
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// we should get an error while loading
@ -648,7 +648,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingNoContentLengthGiven
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -721,7 +721,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingReadCloser() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -793,7 +793,7 @@ func (suite *ManagerTestSuite) TestPngNoAlphaChannelProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -865,7 +865,7 @@ func (suite *ManagerTestSuite) TestPngAlphaChannelProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -934,18 +934,10 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithCallback() {
return io.NopCloser(bytes.NewBuffer(b)), int64(len(b)), nil
}
// test the callback function by setting a simple boolean
var calledPostData bool
postData := func(_ context.Context) error {
calledPostData = true
return nil
}
suite.False(calledPostData) // not called yet (obvs)
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, postData, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -955,9 +947,6 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithCallback() {
suite.NoError(err)
suite.NotNil(attachment)
// the post data callback should have been called
suite.True(calledPostData)
// make sure it's got the stuff set on it that we expect
// the attachment ID and accountID we expect
suite.Equal(attachmentID, attachment.ID)
@ -1021,7 +1010,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessAsync() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
@ -1102,7 +1091,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegQueueSpamming() {
inProcess := []*media.ProcessingMedia{}
for i := 0; i < spam; i++ {
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
inProcess = append(inProcess, processingMedia)
}
@ -1203,7 +1192,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() {
suite.manager = diskManager
// process the media with no additional info provided
processingMedia, err := diskManager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := diskManager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()

View File

@ -34,7 +34,7 @@ type MediaStandardTestSuite struct {
db db.DB
storage *storage.Driver
state state.State
manager media.Manager
manager *media.Manager
transportController transport.Controller
testAttachments map[string]*gtsmodel.MediaAttachment
testAccounts map[string]*gtsmodel.Account

View File

@ -36,16 +36,15 @@ import (
// ProcessingEmoji represents an emoji currently processing. It exposes
// various functions for retrieving data from the process.
type ProcessingEmoji struct {
instAccID string // instance account ID
emoji *gtsmodel.Emoji // processing emoji details
refresh bool // whether this is an existing emoji being refreshed
newPathID string // new emoji path ID to use if refreshed
dataFn DataFunc // load-data function, returns media stream
postFn PostDataCallbackFunc // post data callback function
done bool // done is set when process finishes with non ctx canceled type error
proc runners.Processor // proc helps synchronize only a singular running processing instance
err error // error stores permanent error value when done
mgr *manager // mgr instance (access to db / storage)
instAccID string // instance account ID
emoji *gtsmodel.Emoji // processing emoji details
refresh bool // whether this is an existing emoji being refreshed
newPathID string // new emoji path ID to use if refreshed
dataFn DataFunc // load-data function, returns media stream
done bool // done is set when process finishes with non ctx canceled type error
proc runners.Processor // proc helps synchronize only a singular running processing instance
err error // error stores permanent error value when done
mgr *Manager // mgr instance (access to db / storage)
}
// EmojiID returns the ID of the underlying emoji without blocking processing.
@ -158,17 +157,6 @@ func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, erro
// and updates the underlying attachment fields as necessary. It will then stream
// bytes from p's reader directly into storage so that it can be retrieved later.
func (p *ProcessingEmoji) store(ctx context.Context) error {
defer func() {
if p.postFn == nil {
return
}
// Ensure post callback gets called.
if err := p.postFn(ctx); err != nil {
log.Errorf(ctx, "error executing postdata function: %v", err)
}
}()
// Load media from provided data fn.
rc, sz, err := p.dataFn(ctx)
if err != nil {

View File

@ -40,12 +40,11 @@ import (
type ProcessingMedia struct {
media *gtsmodel.MediaAttachment // processing media attachment details
dataFn DataFunc // load-data function, returns media stream
postFn PostDataCallbackFunc // post data callback function
recache bool // recaching existing (uncached) media
done bool // done is set when process finishes with non ctx canceled type error
proc runners.Processor // proc helps synchronize only a singular running processing instance
err error // error stores permanent error value when done
mgr *manager // mgr instance (access to db / storage)
mgr *Manager // mgr instance (access to db / storage)
}
// AttachmentID returns the ID of the underlying media attachment without blocking processing.
@ -143,17 +142,6 @@ func (p *ProcessingMedia) load(ctx context.Context) (*gtsmodel.MediaAttachment,
// and updates the underlying attachment fields as necessary. It will then stream
// bytes from p's reader directly into storage so that it can be retrieved later.
func (p *ProcessingMedia) store(ctx context.Context) error {
defer func() {
if p.postFn == nil {
return
}
// ensure post callback gets called.
if err := p.postFn(ctx); err != nil {
log.Errorf(ctx, "error executing postdata function: %v", err)
}
}()
// Load media from provided data fun
rc, sz, err := p.dataFn(ctx)
if err != nil {

View File

@ -37,7 +37,14 @@ const (
unusedLocalAttachmentDays = 3 // Number of days to keep local media in storage if not attached to a status.
)
func (m *manager) PruneAll(ctx context.Context, mediaCacheRemoteDays int, blocking bool) error {
// PruneAll runs all of the below pruning/uncacheing functions, and then cleans up any resulting
// empty directories from the storage driver. It can be called as a shortcut for calling the below
// pruning functions one by one.
//
// If blocking is true, then any errors encountered during the prune will be combined + returned to
// the caller. If blocking is false, the prune is run in the background and errors are just logged
// instead.
func (m *Manager) PruneAll(ctx context.Context, mediaCacheRemoteDays int, blocking bool) error {
const dry = false
f := func(innerCtx context.Context) error {
@ -93,7 +100,10 @@ func (m *manager) PruneAll(ctx context.Context, mediaCacheRemoteDays int, blocki
return nil
}
func (m *manager) PruneUnusedRemote(ctx context.Context, dry bool) (int, error) {
// PruneUnusedRemote prunes unused/out of date headers and avatars cached on this instance.
//
// The returned int is the amount of media that was pruned by this function.
func (m *Manager) PruneUnusedRemote(ctx context.Context, dry bool) (int, error) {
var (
totalPruned int
maxID string
@ -152,7 +162,12 @@ func (m *manager) PruneUnusedRemote(ctx context.Context, dry bool) (int, error)
return totalPruned, nil
}
func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) {
// PruneOrphaned prunes files that exist in storage but which do not have a corresponding
// entry in the database.
//
// If dry is true, then nothing will be changed, only the amount that *would* be removed
// is returned to the caller.
func (m *Manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) {
// Emojis are stored under the instance account, so we
// need the ID of the instance account for the next part.
instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "")
@ -200,7 +215,7 @@ func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) {
return m.removeFiles(ctx, orphanedKeys...)
}
func (m *manager) orphaned(ctx context.Context, key string, instanceAccountID string) (bool, error) {
func (m *Manager) orphaned(ctx context.Context, key string, instanceAccountID string) (bool, error) {
pathParts := regexes.FilePath.FindStringSubmatch(key)
if len(pathParts) != 6 {
// This doesn't match our expectations so
@ -239,7 +254,15 @@ func (m *manager) orphaned(ctx context.Context, key string, instanceAccountID st
return orphaned, nil
}
func (m *manager) UncacheRemote(ctx context.Context, olderThanDays int, dry bool) (int, error) {
// UncacheRemote uncaches all remote media attachments older than the given amount of days.
//
// In this context, uncacheing means deleting media files from storage and marking the attachment
// as cached=false in the database.
//
// If 'dry' is true, then only a dry run will be performed: nothing will actually be changed.
//
// The returned int is the amount of media that was/would be uncached by this function.
func (m *Manager) UncacheRemote(ctx context.Context, olderThanDays int, dry bool) (int, error) {
if olderThanDays < 0 {
return 0, nil
}
@ -276,7 +299,12 @@ func (m *manager) UncacheRemote(ctx context.Context, olderThanDays int, dry bool
return totalPruned, nil
}
func (m *manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) {
// PruneUnusedLocal prunes unused media attachments that were uploaded by
// a user on this instance, but never actually attached to a status, or attached but
// later detached.
//
// The returned int is the amount of media that was pruned by this function.
func (m *Manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) {
olderThan := time.Now().Add(-time.Hour * 24 * time.Duration(unusedLocalAttachmentDays))
if dry {
@ -313,7 +341,7 @@ func (m *manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) {
Handy little helpers
*/
func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
func (m *Manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
if _, err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil {
return err
}
@ -322,7 +350,7 @@ func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.Med
return m.state.DB.DeleteAttachment(ctx, attachment.ID)
}
func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
func (m *Manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
if _, err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil {
return err
}
@ -332,7 +360,7 @@ func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.Me
return m.state.DB.UpdateAttachment(ctx, attachment, "cached")
}
func (m *manager) removeFiles(ctx context.Context, keys ...string) (int, error) {
func (m *Manager) removeFiles(ctx context.Context, keys ...string) (int, error) {
errs := make(gtserror.MultiError, 0, len(keys))
for _, key := range keys {

View File

@ -312,7 +312,7 @@ func (suite *PruneTestSuite) TestUncacheAndRecache() {
testStatusAttachment,
testHeader,
} {
processingRecache, err := suite.manager.PreProcessMediaRecache(ctx, data, nil, original.ID)
processingRecache, err := suite.manager.PreProcessMediaRecache(ctx, data, original.ID)
suite.NoError(err)
// synchronously load the recached attachment

View File

@ -32,7 +32,13 @@ import (
type DereferenceMedia func(ctx context.Context, iri *url.URL) (io.ReadCloser, int64, error)
func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceMedia DereferenceMedia) (int, error) {
// RefetchEmojis iterates through remote emojis (for the given domain, or all if domain is empty string).
//
// For each emoji, the manager will check whether both the full size and static images are present in storage.
// If not, the manager will refetch and reprocess full size and static images for the emoji.
//
// The provided DereferenceMedia function will be used when it's necessary to refetch something this way.
func (m *Manager) RefetchEmojis(ctx context.Context, domain string, dereferenceMedia DereferenceMedia) (int, error) {
// normalize domain
if domain == "" {
domain = db.EmojiAllDomains
@ -107,7 +113,7 @@ func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceM
return dereferenceMedia(ctx, emojiImageIRI)
}
processingEmoji, err := m.PreProcessEmoji(ctx, dataFunc, nil, emoji.Shortcode, emoji.ID, emoji.URI, &AdditionalEmojiInfo{
processingEmoji, err := m.PreProcessEmoji(ctx, dataFunc, emoji.Shortcode, emoji.ID, emoji.URI, &AdditionalEmojiInfo{
Domain: &emoji.Domain,
ImageRemoteURL: &emoji.ImageRemoteURL,
ImageStaticRemoteURL: &emoji.ImageStaticRemoteURL,
@ -131,7 +137,7 @@ func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceM
return totalRefetched, nil
}
func (m *manager) emojiRequiresRefetch(ctx context.Context, emoji *gtsmodel.Emoji) (bool, error) {
func (m *Manager) emojiRequiresRefetch(ctx context.Context, emoji *gtsmodel.Emoji) (bool, error) {
if has, err := m.state.Storage.Has(ctx, emoji.ImagePath); err != nil {
return false, err
} else if !has {

View File

@ -110,9 +110,3 @@ type AdditionalEmojiInfo struct {
// DataFunc represents a function used to retrieve the raw bytes of a piece of media.
type DataFunc func(ctx context.Context) (reader io.ReadCloser, fileSize int64, err error)
// PostDataCallbackFunc represents a function executed after the DataFunc has been executed,
// and the returned reader has been read. It can be used to clean up any remaining resources.
//
// This can be set to nil, and will then not be executed.
type PostDataCallbackFunc func(ctx context.Context) error

View File

@ -34,7 +34,7 @@ import (
type Processor struct {
state *state.State
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
oauthServer oauth.Server
filter *visibility.Filter
formatter text.Formatter
@ -46,7 +46,7 @@ type Processor struct {
func New(
state *state.State,
tc typeutils.TypeConverter,
mediaManager media.Manager,
mediaManager *media.Manager,
oauthServer oauth.Server,
federator federation.Federator,
filter *visibility.Filter,

View File

@ -45,7 +45,7 @@ type AccountStandardTestSuite struct {
tc typeutils.TypeConverter
storage *storage.Driver
state state.State
mediaManager media.Manager
mediaManager *media.Manager
oauthServer oauth.Server
fromClientAPIChan chan messages.FromClientAPI
transportController transport.Controller

View File

@ -300,7 +300,7 @@ func (p *Processor) UpdateAvatar(ctx context.Context, avatar *multipart.FileHead
Description: description,
}
processingMedia, err := p.mediaManager.PreProcessMedia(ctx, dataFunc, nil, accountID, ai)
processingMedia, err := p.mediaManager.PreProcessMedia(ctx, dataFunc, accountID, ai)
if err != nil {
return nil, fmt.Errorf("UpdateAvatar: error processing avatar: %s", err)
}
@ -327,7 +327,7 @@ func (p *Processor) UpdateHeader(ctx context.Context, header *multipart.FileHead
Header: &isHeader,
}
processingMedia, err := p.mediaManager.PreProcessMedia(ctx, dataFunc, nil, accountID, ai)
processingMedia, err := p.mediaManager.PreProcessMedia(ctx, dataFunc, accountID, ai)
if err != nil {
return nil, fmt.Errorf("UpdateHeader: error processing header: %s", err)
}

View File

@ -28,13 +28,13 @@ import (
type Processor struct {
state *state.State
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
transportController transport.Controller
emailSender email.Sender
}
// New returns a new admin processor.
func New(state *state.State, tc typeutils.TypeConverter, mediaManager media.Manager, transportController transport.Controller, emailSender email.Sender) Processor {
func New(state *state.State, tc typeutils.TypeConverter, mediaManager *media.Manager, transportController transport.Controller, emailSender email.Sender) Processor {
return Processor{
state: state,
tc: tc,

View File

@ -74,7 +74,7 @@ func (p *Processor) EmojiCreate(ctx context.Context, account *gtsmodel.Account,
}
}
processingEmoji, err := p.mediaManager.PreProcessEmoji(ctx, data, nil, form.Shortcode, emojiID, emojiURI, ai, false)
processingEmoji, err := p.mediaManager.PreProcessEmoji(ctx, data, form.Shortcode, emojiID, emojiURI, ai, false)
if err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error processing emoji: %s", err), "error processing emoji")
}
@ -355,7 +355,7 @@ func (p *Processor) emojiUpdateCopy(ctx context.Context, emoji *gtsmodel.Emoji,
}
}
processingEmoji, err := p.mediaManager.PreProcessEmoji(ctx, data, nil, *shortcode, newEmojiID, newEmojiURI, ai, false)
processingEmoji, err := p.mediaManager.PreProcessEmoji(ctx, data, *shortcode, newEmojiID, newEmojiURI, ai, false)
if err != nil {
err = fmt.Errorf("emojiUpdateCopy: error processing emoji %s: %s", emoji.ID, err)
return nil, gtserror.NewErrorInternalError(err)
@ -461,7 +461,7 @@ func (p *Processor) emojiUpdateModify(ctx context.Context, emoji *gtsmodel.Emoji
}
}
processingEmoji, err := p.mediaManager.PreProcessEmoji(ctx, data, nil, emoji.Shortcode, emoji.ID, emoji.URI, ai, true)
processingEmoji, err := p.mediaManager.PreProcessEmoji(ctx, data, emoji.Shortcode, emoji.ID, emoji.URI, ai, true)
if err != nil {
err = fmt.Errorf("emojiUpdateModify: error processing emoji %s: %s", emoji.ID, err)
return nil, gtserror.NewErrorInternalError(err)

View File

@ -379,8 +379,8 @@ func (p *Processor) processUpdateAccountFromFederator(ctx context.Context, feder
return errors.New("Accountable was not parseable on update account message")
}
// Call RefreshAccount to fetch up-to-date bio, avatar, header, etc.
updatedAccount, _, err := p.federator.RefreshAccount(
// Fetch up-to-date bio, avatar, header, etc.
_, _, err := p.federator.RefreshAccount(
ctx,
federatorMsg.ReceivingAccount.Username,
incomingAccount,
@ -391,11 +391,6 @@ func (p *Processor) processUpdateAccountFromFederator(ctx context.Context, feder
return fmt.Errorf("error enriching updated account from federator: %s", err)
}
// RefreshAccount doesn't make DB update calls, so do that here.
if err := p.state.DB.UpdateAccount(ctx, updatedAccount); err != nil {
return fmt.Errorf("error enriching updated account from federator: %s", err)
}
return nil
}

View File

@ -42,7 +42,7 @@ func (p *Processor) Create(ctx context.Context, account *gtsmodel.Account, form
}
// process the media attachment and load it immediately
media, err := p.mediaManager.PreProcessMedia(ctx, data, nil, account.ID, &media.AdditionalMediaInfo{
media, err := p.mediaManager.PreProcessMedia(ctx, data, account.ID, &media.AdditionalMediaInfo{
Description: &form.Description,
FocusX: &focusX,
FocusY: &focusY,

View File

@ -148,8 +148,7 @@ func (p *Processor) getAttachmentContent(ctx context.Context, requestingAccount
// [
// the reason it was removed was because a slow
// client connection could hold open a storage
// recache operation, and so holding open a media
// worker worker.
// recache operation -> holding open a media worker.
// ]
dataFn := func(innerCtx context.Context) (io.ReadCloser, int64, error) {
@ -161,7 +160,7 @@ func (p *Processor) getAttachmentContent(ctx context.Context, requestingAccount
}
// Start recaching this media with the prepared data function.
processingMedia, err := p.mediaManager.PreProcessMediaRecache(ctx, dataFn, nil, wantedMediaID)
processingMedia, err := p.mediaManager.PreProcessMediaRecache(ctx, dataFn, wantedMediaID)
if err != nil {
return nil, gtserror.NewErrorNotFound(fmt.Errorf("error recaching media: %s", err))
}

View File

@ -27,12 +27,12 @@ import (
type Processor struct {
state *state.State
tc typeutils.TypeConverter
mediaManager media.Manager
mediaManager *media.Manager
transportController transport.Controller
}
// New returns a new media processor.
func New(state *state.State, tc typeutils.TypeConverter, mediaManager media.Manager, transportController transport.Controller) Processor {
func New(state *state.State, tc typeutils.TypeConverter, mediaManager *media.Manager, transportController transport.Controller) Processor {
return Processor{
state: state,
tc: tc,

View File

@ -37,7 +37,7 @@ type MediaStandardTestSuite struct {
tc typeutils.TypeConverter
storage *storage.Driver
state state.State
mediaManager media.Manager
mediaManager *media.Manager
transportController transport.Controller
// standard suite models

View File

@ -44,7 +44,7 @@ type Processor struct {
federator federation.Federator
tc typeutils.TypeConverter
oauthServer oauth.Server
mediaManager mm.Manager
mediaManager *mm.Manager
statusTimelines timeline.Manager
state *state.State
emailSender email.Sender
@ -101,7 +101,7 @@ func NewProcessor(
tc typeutils.TypeConverter,
federator federation.Federator,
oauthServer oauth.Server,
mediaManager mm.Manager,
mediaManager *mm.Manager,
state *state.State,
emailSender email.Sender,
) *Processor {

View File

@ -39,7 +39,7 @@ type ProcessingStandardTestSuite struct {
db db.DB
storage *storage.Driver
state state.State
mediaManager media.Manager
mediaManager *media.Manager
typeconverter typeutils.TypeConverter
httpClient *testrig.MockHTTPClient
transportController transport.Controller

View File

@ -40,7 +40,7 @@ type StatusStandardTestSuite struct {
tc transport.Controller
storage *storage.Driver
state state.State
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
// standard suite models

View File

@ -130,7 +130,7 @@ func (t *transport) deliver(ctx context.Context, b []byte, to *url.URL) error {
if code := rsp.StatusCode; code != http.StatusOK &&
code != http.StatusCreated && code != http.StatusAccepted {
return gtserror.NewResponseError(rsp)
return gtserror.NewFromResponse(rsp)
}
return nil

View File

@ -65,7 +65,7 @@ func (t *transport) Dereference(ctx context.Context, iri *url.URL) ([]byte, erro
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
return nil, gtserror.NewResponseError(rsp)
return nil, gtserror.NewFromResponse(rsp)
}
return io.ReadAll(rsp.Body)

View File

@ -102,7 +102,7 @@ func dereferenceByAPIV1Instance(ctx context.Context, t *transport, iri *url.URL)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, gtserror.NewResponseError(resp)
return nil, gtserror.NewFromResponse(resp)
}
b, err := io.ReadAll(resp.Body)
@ -252,7 +252,7 @@ func callNodeInfoWellKnown(ctx context.Context, t *transport, iri *url.URL) (*ur
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, gtserror.NewResponseError(resp)
return nil, gtserror.NewFromResponse(resp)
}
b, err := io.ReadAll(resp.Body)
@ -303,7 +303,7 @@ func callNodeInfo(ctx context.Context, t *transport, iri *url.URL) (*apimodel.No
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, gtserror.NewResponseError(resp)
return nil, gtserror.NewFromResponse(resp)
}
b, err := io.ReadAll(resp.Body)

View File

@ -46,7 +46,7 @@ func (t *transport) DereferenceMedia(ctx context.Context, iri *url.URL) (io.Read
// Check for an expected status code
if rsp.StatusCode != http.StatusOK {
return nil, 0, gtserror.NewResponseError(rsp)
return nil, 0, gtserror.NewFromResponse(rsp)
}
return rsp.Body, rsp.ContentLength, nil

View File

@ -156,7 +156,7 @@ func (t *transport) Finger(ctx context.Context, targetUsername string, targetDom
}
// We've reached the end of the line here, both the original request
// and our attempt to resolve it through the fallback have failed
return nil, gtserror.NewResponseError(rsp)
return nil, gtserror.NewFromResponse(rsp)
}
// Set the URL in cache here, since host-meta told us this should be the

View File

@ -38,7 +38,7 @@ type TransportTestSuite struct {
suite.Suite
db db.DB
storage *storage.Driver
mediaManager media.Manager
mediaManager *media.Manager
federator federation.Federator
processor *processing.Processor
emailSender email.Sender

View File

@ -1,11 +1,27 @@
#!/bin/sh
set -eu
set -e
# DEBUG returns whether DEBUG build is enabled.
DEBUG() { [ ! -z "${DEBUG-}" ]; }
# Log and execute provided args.
log_exec() { echo "$ ${*}"; "$@"; }
CGO_ENABLED=0 go build -trimpath \
-tags "netgo osusergo static_build kvformat $(DEBUG && echo 'debugenv')" \
-ldflags="-s -w -extldflags '-static' -X 'main.Version=${VERSION:-$(git describe --tags --abbrev=0)}'" \
# Grab environment variables and set defaults + requirements.
GO_BUILDTAGS="${GO_BUILDTAGS-} netgo osusergo static_build kvformat"
GO_LDFLAGS="${GO_LDFLAGS-} -s -w -extldflags '-static' -X 'main.Version=${VERSION:-$(git describe --tags --abbrev=0)}'"
GO_GCFLAGS=${GO_GCFLAGS-}
# Maintain old $DEBUG compat.
[ ! -z "$DEBUG" ] && \
GO_BUILDTAGS="${GO_BUILDTAGS} debugenv"
# Available Go build tags, with explanation, followed by benefits of enabling it:
# - kvformat: enables prettier output of log fields (slightly better performance)
# - notracing: disables compiling-in otel tracing support (reduced binary size, better performance)
# - noerrcaller: disables caller function prefix in errors (slightly better performance, at cost of err readability)
# - debug: enables /debug/pprof endpoint (adds debug, at performance cost)
# - debugenv: enables /debug/pprof endpoint if DEBUG=1 env during runtime (adds debug, at performance cost)
log_exec env CGO_ENABLED=0 go build -trimpath -v \
-tags "${GO_BUILDTAGS}" \
-ldflags="${GO_LDFLAGS}" \
-gcflags="${GO_GCFLAGS}" \
./cmd/gotosocial

View File

@ -25,6 +25,6 @@ import (
)
// NewTestFederator returns a federator with the given database and (mock!!) transport controller.
func NewTestFederator(state *state.State, tc transport.Controller, mediaManager media.Manager) federation.Federator {
func NewTestFederator(state *state.State, tc transport.Controller, mediaManager *media.Manager) federation.Federator {
return federation.NewFederator(state, NewTestFederatingDB(state), tc, NewTestTypeConverter(state.DB), mediaManager)
}

View File

@ -23,7 +23,7 @@ import (
)
// NewTestMediaManager returns a media handler with the default test config, and the given db and storage.
func NewTestMediaManager(state *state.State) media.Manager {
func NewTestMediaManager(state *state.State) *media.Manager {
StartWorkers(state) // ensure started
return media.NewManager(state)
}

View File

@ -26,7 +26,7 @@ import (
)
// NewTestProcessor returns a Processor suitable for testing purposes
func NewTestProcessor(state *state.State, federator federation.Federator, emailSender email.Sender, mediaManager media.Manager) *processing.Processor {
func NewTestProcessor(state *state.State, federator federation.Federator, emailSender email.Sender, mediaManager *media.Manager) *processing.Processor {
p := processing.NewProcessor(NewTestTypeConverter(state.DB), federator, NewTestOauthServer(state.DB), mediaManager, state, emailSender)
state.Workers.EnqueueClientAPI = p.EnqueueClientAPI
state.Workers.EnqueueFederator = p.EnqueueFederator