From 5e2897e35cd2bea889fa37a2a857f4dcc076dafc Mon Sep 17 00:00:00 2001 From: tobi <31960611+tsmethurst@users.noreply.github.com> Date: Sun, 11 Jun 2023 11:18:44 +0200 Subject: [PATCH] [bugfix] Invalidate timeline entries for status when stats change (#1879) --- internal/db/bundb/statusfave.go | 43 ++++++++ internal/db/statusfave.go | 3 + internal/processing/fromclientapi.go | 124 +++++++++++++++------ internal/processing/fromcommon.go | 33 ++++-- internal/processing/fromfederator.go | 138 +++++++++++------------- internal/processing/status/bookmark.go | 14 ++- internal/processing/status/common.go | 37 ++++++- internal/processing/status/pin.go | 26 +++-- internal/timeline/manager.go | 35 +++++- internal/timeline/timeline.go | 16 ++- internal/timeline/unprepare.go | 50 +++++++++ internal/timeline/unprepare_test.go | 142 +++++++++++++++++++++++++ 12 files changed, 531 insertions(+), 130 deletions(-) create mode 100644 internal/timeline/unprepare.go create mode 100644 internal/timeline/unprepare_test.go diff --git a/internal/db/bundb/statusfave.go b/internal/db/bundb/statusfave.go index 497262530..1c96a1dd0 100644 --- a/internal/db/bundb/statusfave.go +++ b/internal/db/bundb/statusfave.go @@ -24,6 +24,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/state" @@ -145,6 +146,48 @@ func (s *statusFaveDB) GetStatusFavesForStatus(ctx context.Context, statusID str return faves, nil } +func (s *statusFaveDB) PopulateStatusFave(ctx context.Context, statusFave *gtsmodel.StatusFave) error { + var ( + err error + errs = make(gtserror.MultiError, 0, 3) + ) + + if statusFave.Account == nil { + // StatusFave author is not set, fetch from database. + statusFave.Account, err = s.state.DB.GetAccountByID( + gtscontext.SetBarebones(ctx), + statusFave.AccountID, + ) + if err != nil { + errs.Append(fmt.Errorf("error populating status fave author: %w", err)) + } + } + + if statusFave.TargetAccount == nil { + // StatusFave target account is not set, fetch from database. + statusFave.TargetAccount, err = s.state.DB.GetAccountByID( + gtscontext.SetBarebones(ctx), + statusFave.TargetAccountID, + ) + if err != nil { + errs.Append(fmt.Errorf("error populating status fave target account: %w", err)) + } + } + + if statusFave.Status == nil { + // StatusFave status is not set, fetch from database. + statusFave.Status, err = s.state.DB.GetStatusByID( + gtscontext.SetBarebones(ctx), + statusFave.StatusID, + ) + if err != nil { + errs.Append(fmt.Errorf("error populating status fave status: %w", err)) + } + } + + return errs.Combine() +} + func (s *statusFaveDB) PutStatusFave(ctx context.Context, fave *gtsmodel.StatusFave) db.Error { return s.state.Caches.GTS.StatusFave().Store(fave, func() error { _, err := s.conn. diff --git a/internal/db/statusfave.go b/internal/db/statusfave.go index b435da514..98ff1d69d 100644 --- a/internal/db/statusfave.go +++ b/internal/db/statusfave.go @@ -35,6 +35,9 @@ type StatusFave interface { // This slice will be unfiltered, not taking account of blocks and whatnot, so filter it before serving it back to a user. GetStatusFavesForStatus(ctx context.Context, statusID string) ([]*gtsmodel.StatusFave, Error) + // PopulateStatusFave ensures that all sub-models of a fave are populated (account, status, etc). + PopulateStatusFave(ctx context.Context, statusFave *gtsmodel.StatusFave) error + // PutStatusFave inserts the given statusFave into the database. PutStatusFave(ctx context.Context, statusFave *gtsmodel.StatusFave) Error diff --git a/internal/processing/fromclientapi.go b/internal/processing/fromclientapi.go index 41bf6ee40..159f09d1b 100644 --- a/internal/processing/fromclientapi.go +++ b/internal/processing/fromclientapi.go @@ -28,6 +28,7 @@ import ( "github.com/superseriousbusiness/activity/pub" "github.com/superseriousbusiness/activity/streams" "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/messages" @@ -157,14 +158,24 @@ func (p *Processor) processCreateAccountFromClientAPI(ctx context.Context, clien func (p *Processor) processCreateStatusFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error { status, ok := clientMsg.GTSModel.(*gtsmodel.Status) if !ok { - return errors.New("note was not parseable as *gtsmodel.Status") + return gtserror.New("status was not parseable as *gtsmodel.Status") } if err := p.timelineAndNotifyStatus(ctx, status); err != nil { - return err + return gtserror.Newf("error timelining status: %w", err) } - return p.federateStatus(ctx, status) + if status.InReplyToID != "" { + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, status.InReplyToID) + } + + if err := p.federateStatus(ctx, status); err != nil { + return gtserror.Newf("error federating status: %w", err) + } + + return nil } func (p *Processor) processCreateFollowRequestFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error { @@ -181,33 +192,50 @@ func (p *Processor) processCreateFollowRequestFromClientAPI(ctx context.Context, } func (p *Processor) processCreateFaveFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error { - fave, ok := clientMsg.GTSModel.(*gtsmodel.StatusFave) + statusFave, ok := clientMsg.GTSModel.(*gtsmodel.StatusFave) if !ok { - return errors.New("fave was not parseable as *gtsmodel.StatusFave") + return gtserror.New("statusFave was not parseable as *gtsmodel.StatusFave") } - if err := p.notifyFave(ctx, fave); err != nil { - return err + if err := p.notifyFave(ctx, statusFave); err != nil { + return gtserror.Newf("error notifying status fave: %w", err) } - return p.federateFave(ctx, fave, clientMsg.OriginAccount, clientMsg.TargetAccount) + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, statusFave.StatusID) + + if err := p.federateFave(ctx, statusFave, clientMsg.OriginAccount, clientMsg.TargetAccount); err != nil { + return gtserror.Newf("error federating status fave: %w", err) + } + + return nil } func (p *Processor) processCreateAnnounceFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error { - boostWrapperStatus, ok := clientMsg.GTSModel.(*gtsmodel.Status) + status, ok := clientMsg.GTSModel.(*gtsmodel.Status) if !ok { return errors.New("boost was not parseable as *gtsmodel.Status") } - if err := p.timelineAndNotifyStatus(ctx, boostWrapperStatus); err != nil { - return err + // Timeline and notify. + if err := p.timelineAndNotifyStatus(ctx, status); err != nil { + return gtserror.Newf("error timelining boost: %w", err) } - if err := p.notifyAnnounce(ctx, boostWrapperStatus); err != nil { - return err + if err := p.notifyAnnounce(ctx, status); err != nil { + return gtserror.Newf("error notifying boost: %w", err) } - return p.federateAnnounce(ctx, boostWrapperStatus, clientMsg.OriginAccount, clientMsg.TargetAccount) + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, status.BoostOfID) + + if err := p.federateAnnounce(ctx, status, clientMsg.OriginAccount, clientMsg.TargetAccount); err != nil { + return gtserror.Newf("error federating boost: %w", err) + } + + return nil } func (p *Processor) processCreateBlockFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error { @@ -293,50 +321,76 @@ func (p *Processor) processUndoBlockFromClientAPI(ctx context.Context, clientMsg } func (p *Processor) processUndoFaveFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error { - fave, ok := clientMsg.GTSModel.(*gtsmodel.StatusFave) + statusFave, ok := clientMsg.GTSModel.(*gtsmodel.StatusFave) if !ok { - return errors.New("undo was not parseable as *gtsmodel.StatusFave") + return gtserror.New("statusFave was not parseable as *gtsmodel.StatusFave") } - return p.federateUnfave(ctx, fave, clientMsg.OriginAccount, clientMsg.TargetAccount) + + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, statusFave.StatusID) + + if err := p.federateUnfave(ctx, statusFave, clientMsg.OriginAccount, clientMsg.TargetAccount); err != nil { + return gtserror.Newf("error federating status unfave: %w", err) + } + + return nil } func (p *Processor) processUndoAnnounceFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error { - boost, ok := clientMsg.GTSModel.(*gtsmodel.Status) + status, ok := clientMsg.GTSModel.(*gtsmodel.Status) if !ok { - return errors.New("undo was not parseable as *gtsmodel.Status") + return errors.New("boost was not parseable as *gtsmodel.Status") } - if err := p.state.DB.DeleteStatusByID(ctx, boost.ID); err != nil { - return err + if err := p.state.DB.DeleteStatusByID(ctx, status.ID); err != nil { + return gtserror.Newf("db error deleting boost: %w", err) } - if err := p.deleteStatusFromTimelines(ctx, boost); err != nil { - return err + if err := p.deleteStatusFromTimelines(ctx, status.ID); err != nil { + return gtserror.Newf("error removing boost from timelines: %w", err) } - return p.federateUnannounce(ctx, boost, clientMsg.OriginAccount, clientMsg.TargetAccount) + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, status.BoostOfID) + + if err := p.federateUnannounce(ctx, status, clientMsg.OriginAccount, clientMsg.TargetAccount); err != nil { + return gtserror.Newf("error federating status unboost: %w", err) + } + + return nil } func (p *Processor) processDeleteStatusFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error { - statusToDelete, ok := clientMsg.GTSModel.(*gtsmodel.Status) + status, ok := clientMsg.GTSModel.(*gtsmodel.Status) if !ok { - return errors.New("note was not parseable as *gtsmodel.Status") + return gtserror.New("status was not parseable as *gtsmodel.Status") } - if statusToDelete.Account == nil { - statusToDelete.Account = clientMsg.OriginAccount + if err := p.state.DB.PopulateStatus(ctx, status); err != nil { + return gtserror.Newf("db error populating status: %w", err) } - // don't delete attachments, just unattach them; - // since this request comes from the client API - // and the poster might want to use the attachments - // again in a new post + // Don't delete attachments, just unattach them: this + // request comes from the client API and the poster + // may want to use attachments again in a new post. deleteAttachments := false - if err := p.wipeStatus(ctx, statusToDelete, deleteAttachments); err != nil { - return err + if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil { + return gtserror.Newf("error wiping status: %w", err) } - return p.federateStatusDelete(ctx, statusToDelete) + if status.InReplyToID != "" { + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, status.InReplyToID) + } + + if err := p.federateStatusDelete(ctx, status); err != nil { + return gtserror.Newf("error federating status delete: %w", err) + } + + return nil } func (p *Processor) processDeleteAccountFromClientAPI(ctx context.Context, clientMsg messages.FromClientAPI) error { diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go index fcb539087..5889da4f7 100644 --- a/internal/processing/fromcommon.go +++ b/internal/processing/fromcommon.go @@ -29,6 +29,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/stream" "github.com/superseriousbusiness/gotosocial/internal/timeline" ) @@ -419,7 +420,7 @@ func (p *Processor) wipeStatus(ctx context.Context, statusToDelete *gtsmodel.Sta // delete all boosts for this status + remove them from timelines if boosts, err := p.state.DB.GetStatusReblogs(ctx, statusToDelete); err == nil { for _, b := range boosts { - if err := p.deleteStatusFromTimelines(ctx, b); err != nil { + if err := p.deleteStatusFromTimelines(ctx, b.ID); err != nil { return err } if err := p.state.DB.DeleteStatusByID(ctx, b.ID); err != nil { @@ -429,7 +430,7 @@ func (p *Processor) wipeStatus(ctx context.Context, statusToDelete *gtsmodel.Sta } // delete this status from any and all timelines - if err := p.deleteStatusFromTimelines(ctx, statusToDelete); err != nil { + if err := p.deleteStatusFromTimelines(ctx, statusToDelete.ID); err != nil { return err } @@ -439,16 +440,36 @@ func (p *Processor) wipeStatus(ctx context.Context, statusToDelete *gtsmodel.Sta // deleteStatusFromTimelines completely removes the given status from all timelines. // It will also stream deletion of the status to all open streams. -func (p *Processor) deleteStatusFromTimelines(ctx context.Context, status *gtsmodel.Status) error { - if err := p.state.Timelines.Home.WipeItemFromAllTimelines(ctx, status.ID); err != nil { +func (p *Processor) deleteStatusFromTimelines(ctx context.Context, statusID string) error { + if err := p.state.Timelines.Home.WipeItemFromAllTimelines(ctx, statusID); err != nil { return err } - if err := p.state.Timelines.List.WipeItemFromAllTimelines(ctx, status.ID); err != nil { + if err := p.state.Timelines.List.WipeItemFromAllTimelines(ctx, statusID); err != nil { return err } - return p.stream.Delete(status.ID) + return p.stream.Delete(statusID) +} + +// invalidateStatusFromTimelines does cache invalidation on the given status by +// unpreparing it from all timelines, forcing it to be prepared again (with updated +// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes +// both for the status itself, and for any boosts of the status. +func (p *Processor) invalidateStatusFromTimelines(ctx context.Context, statusID string) { + if err := p.state.Timelines.Home.UnprepareItemFromAllTimelines(ctx, statusID); err != nil { + log. + WithContext(ctx). + WithField("statusID", statusID). + Errorf("error unpreparing status from home timelines: %v", err) + } + + if err := p.state.Timelines.List.UnprepareItemFromAllTimelines(ctx, statusID); err != nil { + log. + WithContext(ctx). + WithField("statusID", statusID). + Errorf("error unpreparing status from list timelines: %v", err) + } } /* diff --git a/internal/processing/fromfederator.go b/internal/processing/fromfederator.go index 17d60c77f..f165b2835 100644 --- a/internal/processing/fromfederator.go +++ b/internal/processing/fromfederator.go @@ -26,6 +26,7 @@ import ( "codeberg.org/gruf/go-kv" "codeberg.org/gruf/go-logger/v2/level" "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -120,7 +121,7 @@ func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federa // there's a gts model already pinned to the message, it should be a status if status, ok = federatorMsg.GTSModel.(*gtsmodel.Status); !ok { - return errors.New("ProcessFromFederator: note was not parseable as *gtsmodel.Status") + return gtserror.New("Note was not parseable as *gtsmodel.Status") } // Since this was a create originating AP object @@ -140,7 +141,7 @@ func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federa } else { // no model pinned, we need to dereference based on the IRI if federatorMsg.APIri == nil { - return errors.New("ProcessFromFederator: status was not pinned to federatorMsg, and neither was an IRI for us to dereference") + return gtserror.New("status was not pinned to federatorMsg, and neither was an IRI for us to dereference") } status, _, err = p.federator.GetStatusByURI(ctx, federatorMsg.ReceivingAccount.Username, federatorMsg.APIri) @@ -167,44 +168,35 @@ func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federa } } - return p.timelineAndNotifyStatus(ctx, status) + if status.InReplyToID != "" { + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, status.InReplyToID) + } + + if err := p.timelineAndNotifyStatus(ctx, status); err != nil { + return gtserror.Newf("error timelining status: %w", err) + } + + return nil } -// processCreateFaveFromFederator handles Activity Create and Object Like +// processCreateFaveFromFederator handles Activity Create with Object Like. func (p *Processor) processCreateFaveFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - incomingFave, ok := federatorMsg.GTSModel.(*gtsmodel.StatusFave) + statusFave, ok := federatorMsg.GTSModel.(*gtsmodel.StatusFave) if !ok { - return errors.New("like was not parseable as *gtsmodel.StatusFave") + return gtserror.New("Like was not parseable as *gtsmodel.StatusFave") } - // make sure the account is pinned - if incomingFave.Account == nil { - a, err := p.state.DB.GetAccountByID(ctx, incomingFave.AccountID) - if err != nil { - return err - } - incomingFave.Account = a + if err := p.notifyFave(ctx, statusFave); err != nil { + return gtserror.Newf("error notifying status fave: %w", err) } - // Get the remote account to make sure the avi and header are cached. - if incomingFave.Account.Domain != "" { - remoteAccountID, err := url.Parse(incomingFave.Account.URI) - if err != nil { - return err - } + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, statusFave.StatusID) - a, _, err := p.federator.GetAccountByURI(ctx, - federatorMsg.ReceivingAccount.Username, - remoteAccountID, - ) - if err != nil { - return err - } - - incomingFave.Account = a - } - - return p.notifyFave(ctx, incomingFave) + return nil } // processCreateFollowRequestFromFederator handles Activity Create and Object Follow @@ -267,59 +259,43 @@ func (p *Processor) processCreateFollowRequestFromFederator(ctx context.Context, return p.notifyFollow(ctx, follow, followRequest.TargetAccount) } -// processCreateAnnounceFromFederator handles Activity Create and Object Announce +// processCreateAnnounceFromFederator handles Activity Create with Object Announce. func (p *Processor) processCreateAnnounceFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - incomingAnnounce, ok := federatorMsg.GTSModel.(*gtsmodel.Status) + status, ok := federatorMsg.GTSModel.(*gtsmodel.Status) if !ok { - return errors.New("announce was not parseable as *gtsmodel.Status") + return gtserror.New("Announce was not parseable as *gtsmodel.Status") } - // make sure the account is pinned - if incomingAnnounce.Account == nil { - a, err := p.state.DB.GetAccountByID(ctx, incomingAnnounce.AccountID) - if err != nil { - return err - } - incomingAnnounce.Account = a + // Dereference status that this status boosts. + if err := p.federator.DereferenceAnnounce(ctx, status, federatorMsg.ReceivingAccount.Username); err != nil { + return gtserror.Newf("error dereferencing announce: %w", err) } - // Get the remote account to make sure the avi and header are cached. - if incomingAnnounce.Account.Domain != "" { - remoteAccountID, err := url.Parse(incomingAnnounce.Account.URI) - if err != nil { - return err - } - - a, _, err := p.federator.GetAccountByURI(ctx, - federatorMsg.ReceivingAccount.Username, - remoteAccountID, - ) - if err != nil { - return err - } - - incomingAnnounce.Account = a - } - - if err := p.federator.DereferenceAnnounce(ctx, incomingAnnounce, federatorMsg.ReceivingAccount.Username); err != nil { - return fmt.Errorf("error dereferencing announce from federator: %s", err) - } - - incomingAnnounceID, err := id.NewULIDFromTime(incomingAnnounce.CreatedAt) + // Generate an ID for the boost wrapper status. + statusID, err := id.NewULIDFromTime(status.CreatedAt) if err != nil { - return err + return gtserror.Newf("error generating id: %w", err) } - incomingAnnounce.ID = incomingAnnounceID + status.ID = statusID - if err := p.state.DB.PutStatus(ctx, incomingAnnounce); err != nil { - return fmt.Errorf("error adding dereferenced announce to the db: %s", err) + // Store, timeline, and notify. + if err := p.state.DB.PutStatus(ctx, status); err != nil { + return gtserror.Newf("db error inserting status: %w", err) } - if err := p.timelineAndNotifyStatus(ctx, incomingAnnounce); err != nil { - return err + if err := p.timelineAndNotifyStatus(ctx, status); err != nil { + return gtserror.Newf("error timelining status: %w", err) } - return p.notifyAnnounce(ctx, incomingAnnounce) + if err := p.notifyAnnounce(ctx, status); err != nil { + return gtserror.Newf("error notifying status: %w", err) + } + + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, status.ID) + + return nil } // processCreateBlockFromFederator handles Activity Create and Object Block @@ -384,16 +360,26 @@ func (p *Processor) processUpdateAccountFromFederator(ctx context.Context, feder // processDeleteStatusFromFederator handles Activity Delete and Object Note func (p *Processor) processDeleteStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - statusToDelete, ok := federatorMsg.GTSModel.(*gtsmodel.Status) + status, ok := federatorMsg.GTSModel.(*gtsmodel.Status) if !ok { - return errors.New("note was not parseable as *gtsmodel.Status") + return errors.New("Note was not parseable as *gtsmodel.Status") } - // delete attachments from this status since this request + // Delete attachments from this status, since this request // comes from the federating API, and there's no way the - // poster can do a delete + redraft for it on our instance + // poster can do a delete + redraft for it on our instance. deleteAttachments := true - return p.wipeStatus(ctx, statusToDelete, deleteAttachments) + if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil { + return gtserror.Newf("error wiping status: %w", err) + } + + if status.InReplyToID != "" { + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.invalidateStatusFromTimelines(ctx, status.InReplyToID) + } + + return nil } // processDeleteAccountFromFederator handles Activity Delete and Object Profile diff --git a/internal/processing/status/bookmark.go b/internal/processing/status/bookmark.go index ea386b183..64e3fc1fd 100644 --- a/internal/processing/status/bookmark.go +++ b/internal/processing/status/bookmark.go @@ -53,7 +53,12 @@ func (p *Processor) BookmarkCreate(ctx context.Context, requestingAccount *gtsmo } if err := p.state.DB.PutStatusBookmark(ctx, gtsBookmark); err != nil { - err = fmt.Errorf("BookmarkCreate: error putting bookmark in database: %w", err) + err = gtserror.Newf("error putting bookmark in database: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + if err := p.invalidateStatus(ctx, requestingAccount.ID, targetStatusID); err != nil { + err = gtserror.Newf("error invalidating status from timelines: %w", err) return nil, gtserror.NewErrorInternalError(err) } @@ -74,7 +79,12 @@ func (p *Processor) BookmarkRemove(ctx context.Context, requestingAccount *gtsmo // We have a bookmark to remove. if err := p.state.DB.DeleteStatusBookmark(ctx, existingBookmarkID); err != nil { - err = fmt.Errorf("BookmarkRemove: error removing status bookmark: %w", err) + err = gtserror.Newf("error removing status bookmark: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + if err := p.invalidateStatus(ctx, requestingAccount.ID, targetStatusID); err != nil { + err = gtserror.Newf("error invalidating status from timelines: %w", err) return nil, gtserror.NewErrorInternalError(err) } diff --git a/internal/processing/status/common.go b/internal/processing/status/common.go index 1c08a1e65..e557563f3 100644 --- a/internal/processing/status/common.go +++ b/internal/processing/status/common.go @@ -21,15 +21,17 @@ import ( "context" "fmt" + "codeberg.org/gruf/go-kv" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" ) func (p *Processor) apiStatus(ctx context.Context, targetStatus *gtsmodel.Status, requestingAccount *gtsmodel.Account) (*apimodel.Status, gtserror.WithCode) { apiStatus, err := p.tc.StatusToAPIStatus(ctx, targetStatus, requestingAccount) if err != nil { - err = fmt.Errorf("error converting status %s to frontend representation: %w", targetStatus.ID, err) + err = gtserror.Newf("error converting status %s to frontend representation: %w", targetStatus.ID, err) return nil, gtserror.NewErrorInternalError(err) } @@ -66,3 +68,36 @@ func (p *Processor) getVisibleStatus(ctx context.Context, requestingAccount *gts return targetStatus, nil } + +// invalidateStatus is a shortcut function for invalidating the prepared/cached +// representation one status in the home timeline and all list timelines of the +// given accountID. It should only be called in cases where a status update +// does *not* need to be passed into the processor via the worker queue, since +// such invalidation will, in that case, be handled by the processor instead. +func (p *Processor) invalidateStatus(ctx context.Context, accountID string, statusID string) error { + // Get lists first + bail if this fails. + lists, err := p.state.DB.GetListsForAccountID(ctx, accountID) + if err != nil { + return gtserror.Newf("db error getting lists for account %s: %w", accountID, err) + } + + l := log.WithContext(ctx).WithFields(kv.Fields{ + {"accountID", accountID}, + {"statusID", statusID}, + }...) + + // Unprepare item from home + list timelines, just log + // if something goes wrong since this is not a showstopper. + + if err := p.state.Timelines.Home.UnprepareItem(ctx, accountID, statusID); err != nil { + l.Errorf("error unpreparing item from home timeline: %v", err) + } + + for _, list := range lists { + if err := p.state.Timelines.List.UnprepareItem(ctx, list.ID, statusID); err != nil { + l.Errorf("error unpreparing item from list timeline %s: %v", list.ID, err) + } + } + + return nil +} diff --git a/internal/processing/status/pin.go b/internal/processing/status/pin.go index 1e7dc40e8..c5981b699 100644 --- a/internal/processing/status/pin.go +++ b/internal/processing/status/pin.go @@ -95,7 +95,13 @@ func (p *Processor) PinCreate(ctx context.Context, requestingAccount *gtsmodel.A targetStatus.PinnedAt = time.Now() if err := p.state.DB.UpdateStatus(ctx, targetStatus, "pinned_at"); err != nil { - return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error pinning status: %w", err)) + err = gtserror.Newf("db error pinning status: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + if err := p.invalidateStatus(ctx, requestingAccount.ID, targetStatusID); err != nil { + err = gtserror.Newf("error invalidating status from timelines: %w", err) + return nil, gtserror.NewErrorInternalError(err) } return p.apiStatus(ctx, targetStatus, requestingAccount) @@ -118,11 +124,19 @@ func (p *Processor) PinRemove(ctx context.Context, requestingAccount *gtsmodel.A return nil, errWithCode } - if !targetStatus.PinnedAt.IsZero() { - targetStatus.PinnedAt = time.Time{} - if err := p.state.DB.UpdateStatus(ctx, targetStatus, "pinned_at"); err != nil { - return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error unpinning status: %w", err)) - } + if targetStatus.PinnedAt.IsZero() { + return p.apiStatus(ctx, targetStatus, requestingAccount) + } + + targetStatus.PinnedAt = time.Time{} + if err := p.state.DB.UpdateStatus(ctx, targetStatus, "pinned_at"); err != nil { + err = gtserror.Newf("db error unpinning status: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + if err := p.invalidateStatus(ctx, requestingAccount.ID, targetStatusID); err != nil { + err = gtserror.Newf("error invalidating status from timelines: %w", err) + return nil, gtserror.NewErrorInternalError(err) } return p.apiStatus(ctx, targetStatus, requestingAccount) diff --git a/internal/timeline/manager.go b/internal/timeline/manager.go index 95a40aca1..a701756bb 100644 --- a/internal/timeline/manager.go +++ b/internal/timeline/manager.go @@ -75,6 +75,14 @@ type Manager interface { // WipeStatusesFromAccountID removes all items by the given accountID from the given timeline. WipeItemsFromAccountID(ctx context.Context, timelineID string, accountID string) error + // UnprepareItem unprepares/uncaches the prepared version fo the given itemID from the given timelineID. + // Use this for cache invalidation when the prepared representation of an item has changed. + UnprepareItem(ctx context.Context, timelineID string, itemID string) error + + // UnprepareItemFromAllTimelines unprepares/uncaches the prepared version of the given itemID from all timelines. + // Use this for cache invalidation when the prepared representation of an item has changed. + UnprepareItemFromAllTimelines(ctx context.Context, itemID string) error + // Prune manually triggers a prune operation for the given timelineID. Prune(ctx context.Context, timelineID string, desiredPreparedItemsLength int, desiredIndexedItemsLength int) (int, error) @@ -193,7 +201,7 @@ func (m *manager) WipeItemFromAllTimelines(ctx context.Context, itemID string) e }) if len(errors) > 0 { - return gtserror.Newf("one or more errors wiping status %s: %w", itemID, errors.Combine()) + return gtserror.Newf("error(s) wiping status %s: %w", itemID, errors.Combine()) } return nil @@ -204,6 +212,31 @@ func (m *manager) WipeItemsFromAccountID(ctx context.Context, timelineID string, return err } +func (m *manager) UnprepareItemFromAllTimelines(ctx context.Context, itemID string) error { + errors := gtserror.MultiError{} + + // Work through all timelines held by this + // manager, and call Unprepare for each. + m.timelines.Range(func(_ any, v any) bool { + // nolint:forcetypeassert + if err := v.(Timeline).Unprepare(ctx, itemID); err != nil { + errors.Append(err) + } + + return true // always continue range + }) + + if len(errors) > 0 { + return gtserror.Newf("error(s) unpreparing status %s: %w", itemID, errors.Combine()) + } + + return nil +} + +func (m *manager) UnprepareItem(ctx context.Context, timelineID string, itemID string) error { + return m.getOrCreateTimeline(ctx, timelineID).Unprepare(ctx, itemID) +} + func (m *manager) Prune(ctx context.Context, timelineID string, desiredPreparedItemsLength int, desiredIndexedItemsLength int) (int, error) { return m.getOrCreateTimeline(ctx, timelineID).Prune(desiredPreparedItemsLength, desiredIndexedItemsLength), nil } diff --git a/internal/timeline/timeline.go b/internal/timeline/timeline.go index b973a3905..e7c609638 100644 --- a/internal/timeline/timeline.go +++ b/internal/timeline/timeline.go @@ -78,12 +78,22 @@ type Timeline interface { INDEXING + PREPARATION FUNCTIONS */ - // IndexAndPrepareOne puts a item into the timeline at the appropriate place according to its id, and then immediately prepares it. + // IndexAndPrepareOne puts a item into the timeline at the appropriate place + // according to its id, and then immediately prepares it. // - // The returned bool indicates whether or not the item was actually inserted into the timeline. This will be false - // if the item is a boost and the original item or another boost of it already exists < boostReinsertionDepth back in the timeline. + // The returned bool indicates whether or not the item was actually inserted + // into the timeline. This will be false if the item is a boost and the original + // item, or a boost of it, already exists recently in the timeline. IndexAndPrepareOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) + // Unprepare clears the prepared version of the given item (and any boosts + // thereof) from the timeline, but leaves the indexed version in place. + // + // This is useful for cache invalidation when the prepared version of the + // item has changed for some reason (edits, updates, etc), but the item does + // not need to be removed: it will be prepared again next time Get is called. + Unprepare(ctx context.Context, itemID string) error + /* INFO FUNCTIONS */ diff --git a/internal/timeline/unprepare.go b/internal/timeline/unprepare.go new file mode 100644 index 000000000..827b274d8 --- /dev/null +++ b/internal/timeline/unprepare.go @@ -0,0 +1,50 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package timeline + +import ( + "context" +) + +func (t *timeline) Unprepare(ctx context.Context, itemID string) error { + t.Lock() + defer t.Unlock() + + if t.items == nil || t.items.data == nil { + // Nothing to do. + return nil + } + + for e := t.items.data.Front(); e != nil; e = e.Next() { + entry := e.Value.(*indexedItemsEntry) // nolint:forcetypeassert + + if entry.itemID != itemID && entry.boostOfID != itemID { + // Not relevant. + continue + } + + if entry.prepared == nil { + // It's already unprepared (mood). + continue + } + + entry.prepared = nil // <- eat this up please garbage collector nom nom nom + } + + return nil +} diff --git a/internal/timeline/unprepare_test.go b/internal/timeline/unprepare_test.go new file mode 100644 index 000000000..20bef7537 --- /dev/null +++ b/internal/timeline/unprepare_test.go @@ -0,0 +1,142 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package timeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" +) + +type UnprepareTestSuite struct { + TimelineStandardTestSuite +} + +func (suite *UnprepareTestSuite) TestUnprepareFromFave() { + var ( + ctx = context.Background() + testAccount = suite.testAccounts["local_account_1"] + maxID = "" + sinceID = "" + minID = "" + limit = 1 + local = false + ) + + suite.fillTimeline(testAccount.ID) + + // Get first status from the top (no params). + statuses, err := suite.state.Timelines.Home.GetTimeline( + ctx, + testAccount.ID, + maxID, + sinceID, + minID, + limit, + local, + ) + if err != nil { + suite.FailNow(err.Error()) + } + + if len(statuses) != 1 { + suite.FailNow("couldn't get top status") + } + + targetStatus := statuses[0].(*apimodel.Status) + + // Check fave stats of the top status. + suite.Equal(0, targetStatus.FavouritesCount) + suite.False(targetStatus.Favourited) + + // Fave the top status from testAccount. + if err := suite.state.DB.PutStatusFave(ctx, >smodel.StatusFave{ + ID: id.NewULID(), + AccountID: testAccount.ID, + TargetAccountID: targetStatus.Account.ID, + StatusID: targetStatus.ID, + URI: "https://example.org/some/activity/path", + }); err != nil { + suite.FailNow(err.Error()) + } + + // Repeat call to get first status from the top. + // Get first status from the top (no params). + statuses, err = suite.state.Timelines.Home.GetTimeline( + ctx, + testAccount.ID, + maxID, + sinceID, + minID, + limit, + local, + ) + if err != nil { + suite.FailNow(err.Error()) + } + + if len(statuses) != 1 { + suite.FailNow("couldn't get top status") + } + + targetStatus = statuses[0].(*apimodel.Status) + + // We haven't yet uncached/unprepared the status, + // we've only inserted the fave, so counts should + // stay the same... + suite.Equal(0, targetStatus.FavouritesCount) + suite.False(targetStatus.Favourited) + + // Now call unprepare. + suite.state.Timelines.Home.UnprepareItemFromAllTimelines(ctx, targetStatus.ID) + + // Now a Get should trigger a fresh prepare of the + // target status, and the counts should be updated. + // Repeat call to get first status from the top. + // Get first status from the top (no params). + statuses, err = suite.state.Timelines.Home.GetTimeline( + ctx, + testAccount.ID, + maxID, + sinceID, + minID, + limit, + local, + ) + if err != nil { + suite.FailNow(err.Error()) + } + + if len(statuses) != 1 { + suite.FailNow("couldn't get top status") + } + + targetStatus = statuses[0].(*apimodel.Status) + + suite.Equal(1, targetStatus.FavouritesCount) + suite.True(targetStatus.Favourited) +} + +func TestUnprepareTestSuite(t *testing.T) { + suite.Run(t, new(UnprepareTestSuite)) +}