From 3510454768b1877540c6dc25f4967e4b608203a8 Mon Sep 17 00:00:00 2001 From: tobi <31960611+tsmethurst@users.noreply.github.com> Date: Thu, 6 Apr 2023 13:43:13 +0200 Subject: [PATCH] [bugfix/chore] Refactor timeline code (#1656) * start poking timelines * OK yes we're refactoring, but it's nothing like the last time so don't worry * more fiddling * update tests, simplify Get * thanks linter, you're the best, mwah mwah kisses * do a bit more tidying up * start buggering about with the prepare function * fix little oopsie * start merging lists into 1 * ik heb een heel zwaar leven nee nee echt waar * hey it works we did it reddit * regenerate swagger docs * tidy up a wee bit * adjust paging * fix little error, remove unused functions --- docs/api/swagger.yaml | 4 +- internal/api/client/timelines/home.go | 4 +- internal/db/bundb/timeline.go | 36 +- internal/db/bundb/timeline_test.go | 26 + internal/processing/fromcommon.go | 4 +- internal/processing/statustimeline.go | 150 ++--- internal/timeline/get.go | 627 +++++++++--------- internal/timeline/get_test.go | 420 +++++++----- internal/timeline/index.go | 315 ++++++--- internal/timeline/index_test.go | 19 +- internal/timeline/indexeditems.go | 84 ++- internal/timeline/manager.go | 234 +++---- internal/timeline/manager_test.go | 40 +- internal/timeline/preparable.go | 25 - internal/timeline/prepare.go | 304 +++------ internal/timeline/prepareditems.go | 91 --- internal/timeline/prune.go | 88 ++- internal/timeline/prune_test.go | 25 +- internal/timeline/remove.go | 114 ++-- internal/timeline/timeline.go | 58 +- internal/timeline/timeline_test.go | 6 +- .../timeline/{timelineable.go => types.go} | 10 +- 22 files changed, 1319 insertions(+), 1365 deletions(-) delete mode 100644 internal/timeline/preparable.go delete mode 100644 internal/timeline/prepareditems.go rename internal/timeline/{timelineable.go => types.go} (77%) diff --git a/docs/api/swagger.yaml b/docs/api/swagger.yaml index e0a61012a..44e0cba15 100644 --- a/docs/api/swagger.yaml +++ b/docs/api/swagger.yaml @@ -5582,11 +5582,11 @@ paths: in: query name: max_id type: string - - description: Return only statuses *NEWER* than the given since status ID. The status with the specified ID will not be included in the response. + - description: Return only statuses *newer* than the given since status ID. The status with the specified ID will not be included in the response. in: query name: since_id type: string - - description: Return only statuses *NEWER* than the given since status ID. The status with the specified ID will not be included in the response. + - description: Return only statuses *immediately newer* than the given since status ID. The status with the specified ID will not be included in the response. in: query name: min_id type: string diff --git a/internal/api/client/timelines/home.go b/internal/api/client/timelines/home.go index 8ca4a96eb..f63d14fd3 100644 --- a/internal/api/client/timelines/home.go +++ b/internal/api/client/timelines/home.go @@ -62,14 +62,14 @@ import ( // name: since_id // type: string // description: >- -// Return only statuses *NEWER* than the given since status ID. +// Return only statuses *newer* than the given since status ID. // The status with the specified ID will not be included in the response. // in: query // - // name: min_id // type: string // description: >- -// Return only statuses *NEWER* than the given since status ID. +// Return only statuses *immediately newer* than the given since status ID. // The status with the specified ID will not be included in the response. // in: query // required: false diff --git a/internal/db/bundb/timeline.go b/internal/db/bundb/timeline.go index 1ab140103..fabfe2797 100644 --- a/internal/db/bundb/timeline.go +++ b/internal/db/bundb/timeline.go @@ -42,7 +42,10 @@ func (t *timelineDB) GetHomeTimeline(ctx context.Context, accountID string, maxI } // Make educated guess for slice size - statusIDs := make([]string, 0, limit) + var ( + statusIDs = make([]string, 0, limit) + frontToBack = true + ) q := t.conn. NewSelect(). @@ -56,11 +59,9 @@ func (t *timelineDB) GetHomeTimeline(ctx context.Context, accountID string, maxI bun.Ident("follow.target_account_id"), bun.Ident("status.account_id"), bun.Ident("follow.account_id"), - accountID). - // Sort by highest ID (newest) to lowest ID (oldest) - Order("status.id DESC") + accountID) - if maxID == "" { + if maxID == "" || maxID == id.Highest { const future = 24 * time.Hour var err error @@ -83,6 +84,9 @@ func (t *timelineDB) GetHomeTimeline(ctx context.Context, accountID string, maxI if minID != "" { // return only statuses HIGHER (ie., newer) than minID q = q.Where("? > ?", bun.Ident("status.id"), minID) + + // page up + frontToBack = false } if local { @@ -95,6 +99,14 @@ func (t *timelineDB) GetHomeTimeline(ctx context.Context, accountID string, maxI q = q.Limit(limit) } + if frontToBack { + // Page down. + q = q.Order("status.id DESC") + } else { + // Page up. + q = q.Order("status.id ASC") + } + // Use a WhereGroup here to specify that we want EITHER statuses posted by accounts that accountID follows, // OR statuses posted by accountID itself (since a user should be able to see their own statuses). // @@ -110,8 +122,20 @@ func (t *timelineDB) GetHomeTimeline(ctx context.Context, accountID string, maxI return nil, t.conn.ProcessError(err) } - statuses := make([]*gtsmodel.Status, 0, len(statusIDs)) + if len(statusIDs) == 0 { + return nil, nil + } + // If we're paging up, we still want statuses + // to be sorted by ID desc, so reverse ids slice. + // https://zchee.github.io/golang-wiki/SliceTricks/#reversing + if !frontToBack { + for l, r := 0, len(statusIDs)-1; l < r; l, r = l+1, r-1 { + statusIDs[l], statusIDs[r] = statusIDs[r], statusIDs[l] + } + } + + statuses := make([]*gtsmodel.Status, 0, len(statusIDs)) for _, id := range statusIDs { // Fetch status from db for ID status, err := t.state.DB.GetStatusByID(ctx, id) diff --git a/internal/db/bundb/timeline_test.go b/internal/db/bundb/timeline_test.go index d6632b38c..f954c78dd 100644 --- a/internal/db/bundb/timeline_test.go +++ b/internal/db/bundb/timeline_test.go @@ -100,6 +100,32 @@ func (suite *TimelineTestSuite) TestGetHomeTimelineWithFutureStatus() { suite.Len(s, 16) } +func (suite *TimelineTestSuite) TestGetHomeTimelineBackToFront() { + ctx := context.Background() + + viewingAccount := suite.testAccounts["local_account_1"] + + s, err := suite.db.GetHomeTimeline(ctx, viewingAccount.ID, "", "", id.Lowest, 5, false) + suite.NoError(err) + + suite.Len(s, 5) + suite.Equal("01F8MHAYFKS4KMXF8K5Y1C0KRN", s[0].ID) + suite.Equal("01F8MH75CBF9JFX4ZAD54N0W0R", s[len(s)-1].ID) +} + +func (suite *TimelineTestSuite) TestGetHomeTimelineFromHighest() { + ctx := context.Background() + + viewingAccount := suite.testAccounts["local_account_1"] + + s, err := suite.db.GetHomeTimeline(ctx, viewingAccount.ID, id.Highest, "", "", 5, false) + suite.NoError(err) + + suite.Len(s, 5) + suite.Equal("01G36SF3V6Y6V5BF9P4R7PQG7G", s[0].ID) + suite.Equal("01FCTA44PW9H1TB328S9AQXKDS", s[len(s)-1].ID) +} + func getFutureStatus() *gtsmodel.Status { theDistantFuture := time.Now().Add(876600 * time.Hour) id, err := id.NewULIDFromTime(theDistantFuture) diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go index 93d61c533..45c637978 100644 --- a/internal/processing/fromcommon.go +++ b/internal/processing/fromcommon.go @@ -455,8 +455,8 @@ func (p *Processor) timelineStatusForAccount(ctx context.Context, account *gtsmo return nil } - // stick the status in the timeline for the account and then immediately prepare it so they can see it right away - if inserted, err := p.statusTimelines.IngestAndPrepare(ctx, status, account.ID); err != nil { + // stick the status in the timeline for the account + if inserted, err := p.statusTimelines.IngestOne(ctx, account.ID, status); err != nil { return fmt.Errorf("timelineStatusForAccount: error ingesting status %s: %w", status.ID, err) } else if !inserted { return nil diff --git a/internal/processing/statustimeline.go b/internal/processing/statustimeline.go index 4e46b59dc..39c5272b6 100644 --- a/internal/processing/statustimeline.go +++ b/internal/processing/statustimeline.go @@ -41,10 +41,10 @@ func StatusGrabFunction(database db.DB) timeline.GrabFunction { return func(ctx context.Context, timelineAccountID string, maxID string, sinceID string, minID string, limit int) ([]timeline.Timelineable, bool, error) { statuses, err := database.GetHomeTimeline(ctx, timelineAccountID, maxID, sinceID, minID, limit, false) if err != nil { - if err == db.ErrNoEntries { + if errors.Is(err, db.ErrNoEntries) { return nil, true, nil // we just don't have enough statuses left in the db so return stop = true } - return nil, false, fmt.Errorf("statusGrabFunction: error getting statuses from db: %s", err) + return nil, false, fmt.Errorf("statusGrabFunction: error getting statuses from db: %w", err) } items := make([]timeline.Timelineable, len(statuses)) @@ -61,20 +61,20 @@ func StatusFilterFunction(database db.DB, filter *visibility.Filter) timeline.Fi return func(ctx context.Context, timelineAccountID string, item timeline.Timelineable) (shouldIndex bool, err error) { status, ok := item.(*gtsmodel.Status) if !ok { - return false, errors.New("statusFilterFunction: could not convert item to *gtsmodel.Status") + return false, errors.New("StatusFilterFunction: could not convert item to *gtsmodel.Status") } requestingAccount, err := database.GetAccountByID(ctx, timelineAccountID) if err != nil { - return false, fmt.Errorf("statusFilterFunction: error getting account with id %s", timelineAccountID) + return false, fmt.Errorf("StatusFilterFunction: error getting account with id %s: %w", timelineAccountID, err) } timelineable, err := filter.StatusHomeTimelineable(ctx, requestingAccount, status) if err != nil { - log.Warnf(ctx, "error checking hometimelineability of status %s for account %s: %s", status.ID, timelineAccountID, err) + return false, fmt.Errorf("StatusFilterFunction: error checking hometimelineability of status %s for account %s: %w", status.ID, timelineAccountID, err) } - return timelineable, nil // we don't return the error here because we want to just skip this item if something goes wrong + return timelineable, nil } } @@ -83,12 +83,12 @@ func StatusPrepareFunction(database db.DB, tc typeutils.TypeConverter) timeline. return func(ctx context.Context, timelineAccountID string, itemID string) (timeline.Preparable, error) { status, err := database.GetStatusByID(ctx, itemID) if err != nil { - return nil, fmt.Errorf("statusPrepareFunction: error getting status with id %s", itemID) + return nil, fmt.Errorf("StatusPrepareFunction: error getting status with id %s: %w", itemID, err) } requestingAccount, err := database.GetAccountByID(ctx, timelineAccountID) if err != nil { - return nil, fmt.Errorf("statusPrepareFunction: error getting account with id %s", timelineAccountID) + return nil, fmt.Errorf("StatusPrepareFunction: error getting account with id %s: %w", timelineAccountID, err) } return tc.StatusToAPIStatus(ctx, status, requestingAccount) @@ -137,21 +137,24 @@ func StatusSkipInsertFunction() timeline.SkipInsertFunction { } func (p *Processor) HomeTimelineGet(ctx context.Context, authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) (*apimodel.PageableResponse, gtserror.WithCode) { - preparedItems, err := p.statusTimelines.GetTimeline(ctx, authed.Account.ID, maxID, sinceID, minID, limit, local) + statuses, err := p.statusTimelines.GetTimeline(ctx, authed.Account.ID, maxID, sinceID, minID, limit, local) if err != nil { + err = fmt.Errorf("HomeTimelineGet: error getting statuses: %w", err) return nil, gtserror.NewErrorInternalError(err) } - count := len(preparedItems) - + count := len(statuses) if count == 0 { return util.EmptyPageableResponse(), nil } - items := []interface{}{} - nextMaxIDValue := "" - prevMinIDValue := "" - for i, item := range preparedItems { + var ( + items = make([]interface{}, count) + nextMaxIDValue string + prevMinIDValue string + ) + + for i, item := range statuses { if i == count-1 { nextMaxIDValue = item.GetID() } @@ -159,7 +162,8 @@ func (p *Processor) HomeTimelineGet(ctx context.Context, authed *oauth.Auth, max if i == 0 { prevMinIDValue = item.GetID() } - items = append(items, item) + + items[i] = item } return util.PackagePageableResponse(util.PageableResponseParams{ @@ -174,37 +178,54 @@ func (p *Processor) HomeTimelineGet(ctx context.Context, authed *oauth.Auth, max func (p *Processor) PublicTimelineGet(ctx context.Context, authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) (*apimodel.PageableResponse, gtserror.WithCode) { statuses, err := p.state.DB.GetPublicTimeline(ctx, maxID, sinceID, minID, limit, local) if err != nil { - if err == db.ErrNoEntries { - // there are just no entries left + if errors.Is(err, db.ErrNoEntries) { + // No statuses (left) in public timeline. return util.EmptyPageableResponse(), nil } - // there's an actual error + // An actual error has occurred. + err = fmt.Errorf("PublicTimelineGet: db error getting statuses: %w", err) return nil, gtserror.NewErrorInternalError(err) } - filtered, err := p.filterPublicStatuses(ctx, authed, statuses) - if err != nil { - return nil, gtserror.NewErrorInternalError(err) - } - - count := len(filtered) - + count := len(statuses) if count == 0 { return util.EmptyPageableResponse(), nil } - items := []interface{}{} - nextMaxIDValue := "" - prevMinIDValue := "" - for i, item := range filtered { + var ( + items = make([]interface{}, 0, count) + nextMaxIDValue string + prevMinIDValue string + ) + + for i, s := range statuses { + // Set next + prev values before filtering and API + // converting, so caller can still page properly. if i == count-1 { - nextMaxIDValue = item.GetID() + nextMaxIDValue = s.ID } if i == 0 { - prevMinIDValue = item.GetID() + prevMinIDValue = s.ID } - items = append(items, item) + + timelineable, err := p.filter.StatusPublicTimelineable(ctx, authed.Account, s) + if err != nil { + log.Debugf(ctx, "skipping status %s because of an error checking StatusPublicTimelineable: %s", s.ID, err) + continue + } + + if !timelineable { + continue + } + + apiStatus, err := p.tc.StatusToAPIStatus(ctx, s, authed.Account) + if err != nil { + log.Debugf(ctx, "skipping status %s because it couldn't be converted to its api representation: %s", s.ID, err) + continue + } + + items = append(items, apiStatus) } return util.PackagePageableResponse(util.PageableResponseParams{ @@ -219,26 +240,29 @@ func (p *Processor) PublicTimelineGet(ctx context.Context, authed *oauth.Auth, m func (p *Processor) FavedTimelineGet(ctx context.Context, authed *oauth.Auth, maxID string, minID string, limit int) (*apimodel.PageableResponse, gtserror.WithCode) { statuses, nextMaxID, prevMinID, err := p.state.DB.GetFavedTimeline(ctx, authed.Account.ID, maxID, minID, limit) if err != nil { - if err == db.ErrNoEntries { - // there are just no entries left + if errors.Is(err, db.ErrNoEntries) { + // There are just no entries (left). return util.EmptyPageableResponse(), nil } - // there's an actual error + // An actual error has occurred. + err = fmt.Errorf("FavedTimelineGet: db error getting statuses: %w", err) return nil, gtserror.NewErrorInternalError(err) } + count := len(statuses) + if count == 0 { + return util.EmptyPageableResponse(), nil + } + filtered, err := p.filterFavedStatuses(ctx, authed, statuses) if err != nil { + err = fmt.Errorf("FavedTimelineGet: error filtering statuses: %w", err) return nil, gtserror.NewErrorInternalError(err) } - if len(filtered) == 0 { - return util.EmptyPageableResponse(), nil - } - - items := []interface{}{} - for _, item := range filtered { - items = append(items, item) + items := make([]interface{}, len(filtered)) + for i, item := range filtered { + items[i] = item } return util.PackagePageableResponse(util.PageableResponseParams{ @@ -250,47 +274,17 @@ func (p *Processor) FavedTimelineGet(ctx context.Context, authed *oauth.Auth, ma }) } -func (p *Processor) filterPublicStatuses(ctx context.Context, authed *oauth.Auth, statuses []*gtsmodel.Status) ([]*apimodel.Status, error) { - apiStatuses := []*apimodel.Status{} - for _, s := range statuses { - if _, err := p.state.DB.GetAccountByID(ctx, s.AccountID); err != nil { - if err == db.ErrNoEntries { - log.Debugf(ctx, "skipping status %s because account %s can't be found in the db", s.ID, s.AccountID) - continue - } - return nil, gtserror.NewErrorInternalError(fmt.Errorf("filterPublicStatuses: error getting status author: %s", err)) - } - - timelineable, err := p.filter.StatusPublicTimelineable(ctx, authed.Account, s) - if err != nil { - log.Debugf(ctx, "skipping status %s because of an error checking status visibility: %s", s.ID, err) - continue - } - if !timelineable { - continue - } - - apiStatus, err := p.tc.StatusToAPIStatus(ctx, s, authed.Account) - if err != nil { - log.Debugf(ctx, "skipping status %s because it couldn't be converted to its api representation: %s", s.ID, err) - continue - } - - apiStatuses = append(apiStatuses, apiStatus) - } - - return apiStatuses, nil -} - func (p *Processor) filterFavedStatuses(ctx context.Context, authed *oauth.Auth, statuses []*gtsmodel.Status) ([]*apimodel.Status, error) { - apiStatuses := []*apimodel.Status{} + apiStatuses := make([]*apimodel.Status, 0, len(statuses)) + for _, s := range statuses { if _, err := p.state.DB.GetAccountByID(ctx, s.AccountID); err != nil { - if err == db.ErrNoEntries { + if errors.Is(err, db.ErrNoEntries) { log.Debugf(ctx, "skipping status %s because account %s can't be found in the db", s.ID, s.AccountID) continue } - return nil, gtserror.NewErrorInternalError(fmt.Errorf("filterPublicStatuses: error getting status author: %s", err)) + err = fmt.Errorf("filterFavedStatuses: db error getting status author: %w", err) + return nil, gtserror.NewErrorInternalError(err) } timelineable, err := p.filter.StatusVisible(ctx, authed.Account, s) diff --git a/internal/timeline/get.go b/internal/timeline/get.go index f6f885d92..4ca9023f2 100644 --- a/internal/timeline/get.go +++ b/internal/timeline/get.go @@ -25,11 +25,11 @@ import ( "time" "codeberg.org/gruf/go-kv" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" ) -const retries = 5 - func (t *timeline) LastGot() time.Time { t.Lock() defer t.Unlock() @@ -47,339 +47,368 @@ func (t *timeline) Get(ctx context.Context, amount int, maxID string, sinceID st }...) l.Trace("entering get and updating t.lastGot") - // regardless of what happens below, update the - // last time Get was called for this timeline + // Regardless of what happens below, update the + // last time Get was called for this timeline. t.Lock() t.lastGot = time.Now() t.Unlock() - var items []Preparable - var err error + var ( + items []Preparable + err error + ) - // no params are defined to just fetch from the top - // this is equivalent to a user asking for the top x items from their timeline - if maxID == "" && sinceID == "" && minID == "" { - items, err = t.getXFromTop(ctx, amount) - // aysnchronously prepare the next predicted query so it's ready when the user asks for it - if len(items) != 0 { + switch { + case maxID == "" && sinceID == "" && minID == "": + // No params are defined so just fetch from the top. + // This is equivalent to a user starting to view + // their timeline from newest -> older posts. + items, err = t.getXBetweenIDs(ctx, amount, id.Highest, id.Lowest, true) + + // Cache expected next query to speed up scrolling. + // Assume the user will be scrolling downwards from + // the final ID in items. + if prepareNext && err == nil && len(items) != 0 { nextMaxID := items[len(items)-1].GetID() - if prepareNext { - // already cache the next query to speed up scrolling - go func() { - // use context.Background() because we don't want the query to abort when the request finishes - if err := t.prepareNextQuery(context.Background(), amount, nextMaxID, "", ""); err != nil { - l.Errorf("error preparing next query: %s", err) - } - }() - } + t.prepareNextQuery(amount, nextMaxID, "", "") } - } - // maxID is defined but sinceID isn't so take from behind - // this is equivalent to a user asking for the next x items from their timeline, starting from maxID - if maxID != "" && sinceID == "" { - attempts := 0 - items, err = t.getXBehindID(ctx, amount, maxID, &attempts) - // aysnchronously prepare the next predicted query so it's ready when the user asks for it - if len(items) != 0 { + case maxID != "" && sinceID == "" && minID == "": + // Only maxID is defined, so fetch from maxID onwards. + // This is equivalent to a user paging further down + // their timeline from newer -> older posts. + items, err = t.getXBetweenIDs(ctx, amount, maxID, id.Lowest, true) + + // Cache expected next query to speed up scrolling. + // Assume the user will be scrolling downwards from + // the final ID in items. + if prepareNext && err == nil && len(items) != 0 { nextMaxID := items[len(items)-1].GetID() - if prepareNext { - // already cache the next query to speed up scrolling - go func() { - // use context.Background() because we don't want the query to abort when the request finishes - if err := t.prepareNextQuery(context.Background(), amount, nextMaxID, "", ""); err != nil { - l.Errorf("error preparing next query: %s", err) - } - }() - } + t.prepareNextQuery(amount, nextMaxID, "", "") } - } - // maxID is defined and sinceID || minID are as well, so take a slice between them - // this is equivalent to a user asking for items older than x but newer than y - if maxID != "" && sinceID != "" { - items, err = t.getXBetweenID(ctx, amount, maxID, minID) - } - if maxID != "" && minID != "" { - items, err = t.getXBetweenID(ctx, amount, maxID, minID) - } + // In the next cases, maxID is defined, and so are + // either sinceID or minID. This is equivalent to + // a user opening an in-progress timeline and asking + // for a slice of posts somewhere in the middle, or + // trying to "fill in the blanks" between two points, + // paging either up or down. + case maxID != "" && sinceID != "": + items, err = t.getXBetweenIDs(ctx, amount, maxID, sinceID, true) - // maxID isn't defined, but sinceID || minID are, so take x before - // this is equivalent to a user asking for items newer than x (eg., refreshing the top of their timeline) - if maxID == "" && sinceID != "" { - items, err = t.getXBeforeID(ctx, amount, sinceID, true) - } - if maxID == "" && minID != "" { - items, err = t.getXBeforeID(ctx, amount, minID, true) + // Cache expected next query to speed up scrolling. + // We can assume the caller is scrolling downwards. + // Guess id.Lowest as sinceID, since we don't actually + // know what the next sinceID would be. + if prepareNext && err == nil && len(items) != 0 { + nextMaxID := items[len(items)-1].GetID() + t.prepareNextQuery(amount, nextMaxID, id.Lowest, "") + } + + case maxID != "" && minID != "": + items, err = t.getXBetweenIDs(ctx, amount, maxID, minID, false) + + // Cache expected next query to speed up scrolling. + // We can assume the caller is scrolling upwards. + // Guess id.Highest as maxID, since we don't actually + // know what the next maxID would be. + if prepareNext && err == nil && len(items) != 0 { + prevMinID := items[0].GetID() + t.prepareNextQuery(amount, id.Highest, "", prevMinID) + } + + // In the final cases, maxID is not defined, but + // either sinceID or minID are. This is equivalent to + // a user either "pulling up" at the top of their timeline + // to refresh it and check if newer posts have come in, or + // trying to scroll upwards from an old post to see what + // they missed since then. + // + // In these calls, we use the highest possible ulid as + // behindID because we don't have a cap for newest that + // we're interested in. + case maxID == "" && sinceID != "": + items, err = t.getXBetweenIDs(ctx, amount, id.Highest, sinceID, true) + + // We can't cache an expected next query for this one, + // since presumably the caller is at the top of their + // timeline already. + + case maxID == "" && minID != "": + items, err = t.getXBetweenIDs(ctx, amount, id.Highest, minID, false) + + // Cache expected next query to speed up scrolling. + // We can assume the caller is scrolling upwards. + // Guess id.Highest as maxID, since we don't actually + // know what the next maxID would be. + if prepareNext && err == nil && len(items) != 0 { + prevMinID := items[0].GetID() + t.prepareNextQuery(amount, id.Highest, "", prevMinID) + } + + default: + err = errors.New("Get: switch statement exhausted with no results") } return items, err } -// getXFromTop returns x amount of items from the top of the timeline, from newest to oldest. -func (t *timeline) getXFromTop(ctx context.Context, amount int) ([]Preparable, error) { - // make a slice of preparedItems with the length we need to return - preparedItems := make([]Preparable, 0, amount) +// getXBetweenIDs returns x amount of items somewhere between (not including) the given IDs. +// +// If frontToBack is true, items will be served paging down from behindID. +// This corresponds to an api call to /timelines/home?max_id=WHATEVER&since_id=WHATEVER +// +// If frontToBack is false, items will be served paging up from beforeID. +// This corresponds to an api call to /timelines/home?max_id=WHATEVER&min_id=WHATEVER +func (t *timeline) getXBetweenIDs(ctx context.Context, amount int, behindID string, beforeID string, frontToBack bool) ([]Preparable, error) { + l := log. + WithContext(ctx). + WithFields(kv.Fields{ + {"amount", amount}, + {"behindID", behindID}, + {"beforeID", beforeID}, + {"frontToBack", frontToBack}, + }...) + l.Trace("entering getXBetweenID") - if t.preparedItems.data == nil { - t.preparedItems.data = &list.List{} + // Assume length we need to return. + items := make([]Preparable, 0, amount) + + if beforeID >= behindID { + // This is an impossible situation, we + // can't serve anything between these. + return items, nil } - // make sure we have enough items prepared to return - if t.preparedItems.data.Len() < amount { - if err := t.PrepareFromTop(ctx, amount); err != nil { + // Try to ensure we have enough items prepared. + if err := t.prepareXBetweenIDs(ctx, amount, behindID, beforeID, frontToBack); err != nil { + // An error here doesn't necessarily mean we + // can't serve anything, so log + keep going. + l.Debugf("error calling prepareXBetweenIDs: %s", err) + } + + var ( + beforeIDMark *list.Element + served int + // Our behavior while ranging through the + // list changes depending on if we're + // going front-to-back or back-to-front. + // + // To avoid checking which one we're doing + // in each loop iteration, define our range + // function here outside the loop. + // + // The bool indicates to the caller whether + // iteration should continue (true) or stop + // (false). + rangeF func(e *list.Element) (bool, error) + // If we get certain errors on entries as we're + // looking through, we might want to cheekily + // remove their elements from the timeline. + // Everything added to this slice will be removed. + removeElements = []*list.Element{} + ) + + defer func() { + for _, e := range removeElements { + t.items.data.Remove(e) + } + }() + + if frontToBack { + // We're going front-to-back, which means we + // don't need to look for a mark per se, we + // just keep serving items until we've reached + // a point where the items are out of the range + // we're interested in. + rangeF = func(e *list.Element) (bool, error) { + entry := e.Value.(*indexedItemsEntry) //nolint:forcetypeassert + + if entry.itemID >= behindID { + // ID of this item is too high, + // just keep iterating. + l.Trace("item is too new, continuing") + return true, nil + } + + if entry.itemID <= beforeID { + // We've gone as far as we can through + // the list and reached entries that are + // now too old for us, stop here. + l.Trace("reached older items, breaking") + return false, nil + } + + l.Trace("entry is just right") + + if entry.prepared == nil { + // Whoops, this entry isn't prepared yet; some + // race condition? That's OK, we can do it now. + prepared, err := t.prepareFunction(ctx, t.accountID, entry.itemID) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + // ErrNoEntries means something has been deleted, + // so we'll likely not be able to ever prepare this. + // This means we can remove it and skip past it. + l.Debugf("db.ErrNoEntries while trying to prepare %s; will remove from timeline", entry.itemID) + removeElements = append(removeElements, e) + return true, nil + } + // We've got a proper db error. + return false, fmt.Errorf("getXBetweenIDs: db error while trying to prepare %s: %w", entry.itemID, err) + } + entry.prepared = prepared + } + + items = append(items, entry.prepared) + + served++ + return served < amount, nil + } + } else { + // Iterate through the list from the top, until + // we reach an item with id smaller than beforeID; + // ie., an item OLDER than beforeID. At that point, + // we can stop looking because we're not interested + // in older entries. + rangeF = func(e *list.Element) (bool, error) { + // Move the mark back one place each loop. + beforeIDMark = e + + //nolint:forcetypeassert + if entry := e.Value.(*indexedItemsEntry); entry.itemID <= beforeID { + // We've gone as far as we can through + // the list and reached entries that are + // now too old for us, stop here. + l.Trace("reached older items, breaking") + return false, nil + } + + return true, nil + } + } + + // Iterate through the list until the function + // we defined above instructs us to stop. + for e := t.items.data.Front(); e != nil; e = e.Next() { + keepGoing, err := rangeF(e) + if err != nil { return nil, err } + + if !keepGoing { + break + } } - // work through the prepared items from the top and return - var served int - for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return nil, errors.New("getXFromTop: could not parse e as a preparedItemsEntry") + if frontToBack || beforeIDMark == nil { + // If we're serving front to back, then + // items should be populated by now. If + // we're serving back to front but didn't + // find any items newer than beforeID, + // we can just return empty items. + return items, nil + } + + // We're serving back to front, so iterate upwards + // towards the front of the list from the mark we found, + // until we either get to the front, serve enough + // items, or reach behindID. + // + // To preserve ordering, we need to reverse the slice + // when we're finished. + for e := beforeIDMark; e != nil; e = e.Prev() { + entry := e.Value.(*indexedItemsEntry) //nolint:forcetypeassert + + if entry.itemID == beforeID { + // Don't include the beforeID + // entry itself, just continue. + l.Trace("entry item ID is equal to beforeID, skipping") + continue } - preparedItems = append(preparedItems, entry.prepared) + + if entry.itemID >= behindID { + // We've reached items that are + // newer than what we're looking + // for, just stop here. + l.Trace("reached newer items, breaking") + break + } + + if entry.prepared == nil { + // Whoops, this entry isn't prepared yet; some + // race condition? That's OK, we can do it now. + prepared, err := t.prepareFunction(ctx, t.accountID, entry.itemID) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + // ErrNoEntries means something has been deleted, + // so we'll likely not be able to ever prepare this. + // This means we can remove it and skip past it. + l.Debugf("db.ErrNoEntries while trying to prepare %s; will remove from timeline", entry.itemID) + removeElements = append(removeElements, e) + continue + } + // We've got a proper db error. + return nil, fmt.Errorf("getXBetweenIDs: db error while trying to prepare %s: %w", entry.itemID, err) + } + entry.prepared = prepared + } + + items = append(items, entry.prepared) + served++ if served >= amount { break } } - return preparedItems, nil + // Reverse order of items. + // https://zchee.github.io/golang-wiki/SliceTricks/#reversing + for l, r := 0, len(items)-1; l < r; l, r = l+1, r-1 { + items[l], items[r] = items[r], items[l] + } + + return items, nil } -// getXBehindID returns x amount of items from the given id onwards, from newest to oldest. -// This will NOT include the item with the given ID. -// -// This corresponds to an api call to /timelines/home?max_id=WHATEVER -func (t *timeline) getXBehindID(ctx context.Context, amount int, behindID string, attempts *int) ([]Preparable, error) { - l := log.WithContext(ctx). - WithFields(kv.Fields{ - {"amount", amount}, - {"behindID", behindID}, - {"attempts", attempts}, - }...) +func (t *timeline) prepareNextQuery(amount int, maxID string, sinceID string, minID string) { + var ( + // We explicitly use context.Background() rather than + // accepting a context param because we don't want this + // to stop/break when the calling context finishes. + ctx = context.Background() + err error + ) - newAttempts := *attempts - newAttempts++ - attempts = &newAttempts - - // make a slice of items with the length we need to return - items := make([]Preparable, 0, amount) - - if t.preparedItems.data == nil { - t.preparedItems.data = &list.List{} - } - - // iterate through the modified list until we hit the mark we're looking for - var position int - var behindIDMark *list.Element - -findMarkLoop: - for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { - position++ - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return nil, errors.New("getXBehindID: could not parse e as a preparedPostsEntry") + // Always perform this async so caller doesn't have to wait. + go func() { + switch { + case maxID == "" && sinceID == "" && minID == "": + err = t.prepareXBetweenIDs(ctx, amount, id.Highest, id.Lowest, true) + case maxID != "" && sinceID == "" && minID == "": + err = t.prepareXBetweenIDs(ctx, amount, maxID, id.Lowest, true) + case maxID != "" && sinceID != "": + err = t.prepareXBetweenIDs(ctx, amount, maxID, sinceID, true) + case maxID != "" && minID != "": + err = t.prepareXBetweenIDs(ctx, amount, maxID, minID, false) + case maxID == "" && sinceID != "": + err = t.prepareXBetweenIDs(ctx, amount, id.Highest, sinceID, true) + case maxID == "" && minID != "": + err = t.prepareXBetweenIDs(ctx, amount, id.Highest, minID, false) + default: + err = errors.New("Get: switch statement exhausted with no results") } - if entry.itemID <= behindID { - l.Trace("found behindID mark") - behindIDMark = e - break findMarkLoop - } - } - - // we didn't find it, so we need to make sure it's indexed and prepared and then try again - // this can happen when a user asks for really old items - if behindIDMark == nil { - if err := t.prepareBehind(ctx, behindID, amount); err != nil { - return nil, fmt.Errorf("getXBehindID: error preparing behind and including ID %s", behindID) - } - oldestID, err := t.oldestPreparedItemID(ctx) if err != nil { - return nil, err + log. + WithContext(ctx). + WithFields(kv.Fields{ + {"amount", amount}, + {"maxID", maxID}, + {"sinceID", sinceID}, + {"minID", minID}, + }...). + Warnf("error preparing next query: %s", err) } - if oldestID == "" { - l.Tracef("oldestID is empty so we can't return behindID %s", behindID) - return items, nil - } - if oldestID == behindID { - l.Tracef("given behindID %s is the same as oldestID %s so there's nothing to return behind it", behindID, oldestID) - return items, nil - } - if *attempts > retries { - l.Tracef("exceeded retries looking for behindID %s", behindID) - return items, nil - } - l.Trace("trying getXBehindID again") - return t.getXBehindID(ctx, amount, behindID, attempts) - } - - // make sure we have enough items prepared behind it to return what we're being asked for - if t.preparedItems.data.Len() < amount+position { - if err := t.prepareBehind(ctx, behindID, amount); err != nil { - return nil, err - } - } - - // start serving from the entry right after the mark - var served int -serveloop: - for e := behindIDMark.Next(); e != nil; e = e.Next() { - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return nil, errors.New("getXBehindID: could not parse e as a preparedPostsEntry") - } - - // serve up to the amount requested - items = append(items, entry.prepared) - served++ - if served >= amount { - break serveloop - } - } - - return items, nil -} - -// getXBeforeID returns x amount of items up to the given id, from newest to oldest. -// This will NOT include the item with the given ID. -// -// This corresponds to an api call to /timelines/home?since_id=WHATEVER -func (t *timeline) getXBeforeID(ctx context.Context, amount int, beforeID string, startFromTop bool) ([]Preparable, error) { - // make a slice of items with the length we need to return - items := make([]Preparable, 0, amount) - - if t.preparedItems.data == nil { - t.preparedItems.data = &list.List{} - } - - // iterate through the modified list until we hit the mark we're looking for, or as close as possible to it - var beforeIDMark *list.Element -findMarkLoop: - for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return nil, errors.New("getXBeforeID: could not parse e as a preparedPostsEntry") - } - - if entry.itemID >= beforeID { - beforeIDMark = e - } else { - break findMarkLoop - } - } - - if beforeIDMark == nil { - return items, nil - } - - var served int - - if startFromTop { - // start serving from the front/top and keep going until we hit mark or get x amount items - serveloopFromTop: - for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return nil, errors.New("getXBeforeID: could not parse e as a preparedPostsEntry") - } - - if entry.itemID == beforeID { - break serveloopFromTop - } - - // serve up to the amount requested - items = append(items, entry.prepared) - served++ - if served >= amount { - break serveloopFromTop - } - } - } else if !startFromTop { - // start serving from the entry right before the mark - serveloopFromBottom: - for e := beforeIDMark.Prev(); e != nil; e = e.Prev() { - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return nil, errors.New("getXBeforeID: could not parse e as a preparedPostsEntry") - } - - // serve up to the amount requested - items = append(items, entry.prepared) - served++ - if served >= amount { - break serveloopFromBottom - } - } - } - - return items, nil -} - -// getXBetweenID returns x amount of items from the given maxID, up to the given id, from newest to oldest. -// This will NOT include the item with the given IDs. -// -// This corresponds to an api call to /timelines/home?since_id=WHATEVER&max_id=WHATEVER_ELSE -func (t *timeline) getXBetweenID(ctx context.Context, amount int, behindID string, beforeID string) ([]Preparable, error) { - // make a slice of items with the length we need to return - items := make([]Preparable, 0, amount) - - if t.preparedItems.data == nil { - t.preparedItems.data = &list.List{} - } - - // iterate through the modified list until we hit the mark we're looking for - var position int - var behindIDMark *list.Element -findMarkLoop: - for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { - position++ - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return nil, errors.New("getXBetweenID: could not parse e as a preparedPostsEntry") - } - - if entry.itemID == behindID { - behindIDMark = e - break findMarkLoop - } - } - - // we didn't find it - if behindIDMark == nil { - return nil, fmt.Errorf("getXBetweenID: couldn't find item with ID %s", behindID) - } - - // make sure we have enough items prepared behind it to return what we're being asked for - if t.preparedItems.data.Len() < amount+position { - if err := t.prepareBehind(ctx, behindID, amount); err != nil { - return nil, err - } - } - - // start serving from the entry right after the mark - var served int -serveloop: - for e := behindIDMark.Next(); e != nil; e = e.Next() { - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return nil, errors.New("getXBetweenID: could not parse e as a preparedPostsEntry") - } - - if entry.itemID == beforeID { - break serveloop - } - - // serve up to the amount requested - items = append(items, entry.prepared) - served++ - if served >= amount { - break serveloop - } - } - - return items, nil + }() } diff --git a/internal/timeline/get_test.go b/internal/timeline/get_test.go index 071f33aaf..444c159c4 100644 --- a/internal/timeline/get_test.go +++ b/internal/timeline/get_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/processing" "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/visibility" @@ -43,8 +44,8 @@ func (suite *GetTestSuite) SetupSuite() { func (suite *GetTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.InitTestLog() testrig.InitTestConfig() + testrig.InitTestLog() suite.db = testrig.NewTestDB(&suite.state) suite.tc = testrig.NewTestTypeConverter(suite.db) @@ -52,8 +53,9 @@ func (suite *GetTestSuite) SetupTest() { testrig.StandardDBSetup(suite.db, nil) - // let's take local_account_1 as the timeline owner - tl, err := timeline.NewTimeline( + // Take local_account_1 as the timeline owner, it + // doesn't really matter too much for these tests. + tl := timeline.NewTimeline( context.Background(), suite.testAccounts["local_account_1"].ID, processing.StatusGrabFunction(suite.db), @@ -61,20 +63,27 @@ func (suite *GetTestSuite) SetupTest() { processing.StatusPrepareFunction(suite.db, suite.tc), processing.StatusSkipInsertFunction(), ) - if err != nil { - suite.FailNow(err.Error()) - } - // put the status IDs in a determinate order since we can't trust a map to keep its order + // Put testrig statuses in a determinate order + // since we can't trust a map to keep order. statuses := []*gtsmodel.Status{} for _, s := range suite.testStatuses { statuses = append(statuses, s) } + sort.Slice(statuses, func(i, j int) bool { return statuses[i].ID > statuses[j].ID }) - // prepare the timeline by just shoving all test statuses in it -- let's not be fussy about who sees what + // Statuses are now highest -> lowest. + suite.highestStatusID = statuses[0].ID + suite.lowestStatusID = statuses[len(statuses)-1].ID + if suite.highestStatusID < suite.lowestStatusID { + suite.FailNow("", "statuses weren't ordered properly by sort") + } + + // Put all test statuses into the timeline; we don't + // need to be fussy about who sees what for these tests. for _, s := range statuses { _, err := tl.IndexAndPrepareOne(context.Background(), s.GetID(), s.BoostOfID, s.AccountID, s.BoostOfAccountID) if err != nil { @@ -89,219 +98,292 @@ func (suite *GetTestSuite) TearDownTest() { testrig.StandardDBTeardown(suite.db) } -func (suite *GetTestSuite) TestGetDefault() { - // lastGot should be zero - suite.Zero(suite.timeline.LastGot()) - - // get 10 20 the top and don't prepare the next query - statuses, err := suite.timeline.Get(context.Background(), 20, "", "", "", false) - if err != nil { - suite.FailNow(err.Error()) +func (suite *GetTestSuite) checkStatuses(statuses []timeline.Preparable, maxID string, minID string, expectedLength int) { + if l := len(statuses); l != expectedLength { + suite.FailNow("", "expected %d statuses in slice, got %d", expectedLength, l) + } else if l == 0 { + // Can't test empty slice. + return } - // we only have 16 statuses in the test suite - suite.Len(statuses, 17) + // Check ordering + bounds of statuses. + highest := statuses[0].GetID() + for _, status := range statuses { + id := status.GetID() - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() + if id >= maxID { + suite.FailNow("", "%s greater than maxID %s", id, maxID) } - } - // lastGot should be up to date - suite.WithinDuration(time.Now(), suite.timeline.LastGot(), 1*time.Second) + if id <= minID { + suite.FailNow("", "%s smaller than minID %s", id, minID) + } + + if id > highest { + suite.FailNow("", "statuses in slice were not ordered highest -> lowest ID") + } + + highest = id + } } -func (suite *GetTestSuite) TestGetDefaultPrepareNext() { - // get 10 from the top and prepare the next query - statuses, err := suite.timeline.Get(context.Background(), 10, "", "", "", true) +func (suite *GetTestSuite) TestGetNewTimelinePageDown() { + // Take a fresh timeline for this test. + // This tests whether indexing works + // properly against uninitialized timelines. + tl := timeline.NewTimeline( + context.Background(), + suite.testAccounts["local_account_1"].ID, + processing.StatusGrabFunction(suite.db), + processing.StatusFilterFunction(suite.db, suite.filter), + processing.StatusPrepareFunction(suite.db, suite.tc), + processing.StatusSkipInsertFunction(), + ) + + // Get 5 from the top. + statuses, err := tl.Get(context.Background(), 5, "", "", "", true) + if err != nil { + suite.FailNow(err.Error()) + } + suite.checkStatuses(statuses, id.Highest, id.Lowest, 5) + + // Get 5 from next maxID. + nextMaxID := statuses[len(statuses)-1].GetID() + statuses, err = tl.Get(context.Background(), 5, nextMaxID, "", "", false) + if err != nil { + suite.FailNow(err.Error()) + } + suite.checkStatuses(statuses, nextMaxID, id.Lowest, 5) +} + +func (suite *GetTestSuite) TestGetNewTimelinePageUp() { + // Take a fresh timeline for this test. + // This tests whether indexing works + // properly against uninitialized timelines. + tl := timeline.NewTimeline( + context.Background(), + suite.testAccounts["local_account_1"].ID, + processing.StatusGrabFunction(suite.db), + processing.StatusFilterFunction(suite.db, suite.filter), + processing.StatusPrepareFunction(suite.db, suite.tc), + processing.StatusSkipInsertFunction(), + ) + + // Get 5 from the back. + statuses, err := tl.Get(context.Background(), 5, "", "", id.Lowest, false) + if err != nil { + suite.FailNow(err.Error()) + } + suite.checkStatuses(statuses, id.Highest, id.Lowest, 5) + + // Page upwards. + nextMinID := statuses[len(statuses)-1].GetID() + statuses, err = tl.Get(context.Background(), 5, "", "", nextMinID, false) + if err != nil { + suite.FailNow(err.Error()) + } + suite.checkStatuses(statuses, id.Highest, nextMinID, 5) +} + +func (suite *GetTestSuite) TestGetNewTimelineMoreThanPossible() { + // Take a fresh timeline for this test. + // This tests whether indexing works + // properly against uninitialized timelines. + tl := timeline.NewTimeline( + context.Background(), + suite.testAccounts["local_account_1"].ID, + processing.StatusGrabFunction(suite.db), + processing.StatusFilterFunction(suite.db, suite.filter), + processing.StatusPrepareFunction(suite.db, suite.tc), + processing.StatusSkipInsertFunction(), + ) + + // Get 100 from the top. + statuses, err := tl.Get(context.Background(), 100, id.Highest, "", "", false) + if err != nil { + suite.FailNow(err.Error()) + } + suite.checkStatuses(statuses, id.Highest, id.Lowest, 16) +} + +func (suite *GetTestSuite) TestGetNewTimelineMoreThanPossiblePageUp() { + // Take a fresh timeline for this test. + // This tests whether indexing works + // properly against uninitialized timelines. + tl := timeline.NewTimeline( + context.Background(), + suite.testAccounts["local_account_1"].ID, + processing.StatusGrabFunction(suite.db), + processing.StatusFilterFunction(suite.db, suite.filter), + processing.StatusPrepareFunction(suite.db, suite.tc), + processing.StatusSkipInsertFunction(), + ) + + // Get 100 from the back. + statuses, err := tl.Get(context.Background(), 100, "", "", id.Lowest, false) + if err != nil { + suite.FailNow(err.Error()) + } + suite.checkStatuses(statuses, id.Highest, id.Lowest, 16) +} + +func (suite *GetTestSuite) TestGetNoParams() { + // Get 10 statuses from the top (no params). + statuses, err := suite.timeline.Get(context.Background(), 10, "", "", "", false) if err != nil { suite.FailNow(err.Error()) } - suite.Len(statuses, 10) + suite.checkStatuses(statuses, id.Highest, id.Lowest, 10) - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - } - - // sleep a second so the next query can run - time.Sleep(1 * time.Second) + // First status should have the highest ID in the testrig. + suite.Equal(suite.highestStatusID, statuses[0].GetID()) } func (suite *GetTestSuite) TestGetMaxID() { - // ask for 10 with a max ID somewhere in the middle of the stack - statuses, err := suite.timeline.Get(context.Background(), 10, "01F8MHBQCBTDKN6X5VHGMMN4MA", "", "", false) + // Ask for 10 with a max ID somewhere in the middle of the stack. + maxID := "01F8MHBQCBTDKN6X5VHGMMN4MA" + + statuses, err := suite.timeline.Get(context.Background(), 10, maxID, "", "", false) if err != nil { suite.FailNow(err.Error()) } - // we should only get 6 statuses back, since we asked for a max ID that excludes some of our entries - suite.Len(statuses, 6) - - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - } -} - -func (suite *GetTestSuite) TestGetMaxIDPrepareNext() { - // ask for 10 with a max ID somewhere in the middle of the stack - statuses, err := suite.timeline.Get(context.Background(), 10, "01F8MHBQCBTDKN6X5VHGMMN4MA", "", "", true) - if err != nil { - suite.FailNow(err.Error()) - } - - // we should only get 6 statuses back, since we asked for a max ID that excludes some of our entries - suite.Len(statuses, 6) - - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - } - - // sleep a second so the next query can run - time.Sleep(1 * time.Second) -} - -func (suite *GetTestSuite) TestGetMinID() { - // ask for 15 with a min ID somewhere in the middle of the stack - statuses, err := suite.timeline.Get(context.Background(), 10, "", "01F8MHBQCBTDKN6X5VHGMMN4MA", "", false) - if err != nil { - suite.FailNow(err.Error()) - } - - // we should only get 10 statuses back, since we asked for a min ID that excludes some of our entries - suite.Len(statuses, 10) - - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - } + // We'll only get 6 statuses back. + suite.checkStatuses(statuses, maxID, id.Lowest, 6) } func (suite *GetTestSuite) TestGetSinceID() { - // ask for 15 with a since ID somewhere in the middle of the stack - statuses, err := suite.timeline.Get(context.Background(), 15, "", "", "01F8MHBQCBTDKN6X5VHGMMN4MA", false) + // Ask for 10 with a since ID somewhere in the middle of the stack. + sinceID := "01F8MHBQCBTDKN6X5VHGMMN4MA" + statuses, err := suite.timeline.Get(context.Background(), 10, "", sinceID, "", false) if err != nil { suite.FailNow(err.Error()) } - // we should only get 10 statuses back, since we asked for a since ID that excludes some of our entries - suite.Len(statuses, 10) + suite.checkStatuses(statuses, id.Highest, sinceID, 10) - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - } + // The first status in the stack should have the highest ID of all + // in the testrig, because we're paging down. + suite.Equal(suite.highestStatusID, statuses[0].GetID()) } -func (suite *GetTestSuite) TestGetSinceIDPrepareNext() { - // ask for 15 with a since ID somewhere in the middle of the stack - statuses, err := suite.timeline.Get(context.Background(), 15, "", "", "01F8MHBQCBTDKN6X5VHGMMN4MA", true) +func (suite *GetTestSuite) TestGetSinceIDOneOnly() { + // Ask for 1 with a since ID somewhere in the middle of the stack. + sinceID := "01F8MHBQCBTDKN6X5VHGMMN4MA" + statuses, err := suite.timeline.Get(context.Background(), 1, "", sinceID, "", false) if err != nil { suite.FailNow(err.Error()) } - // we should only get 10 statuses back, since we asked for a since ID that excludes some of our entries - suite.Len(statuses, 10) + suite.checkStatuses(statuses, id.Highest, sinceID, 1) - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } + // The one status we got back should have the highest ID of all in + // the testrig, because using sinceID means we're paging down. + suite.Equal(suite.highestStatusID, statuses[0].GetID()) +} + +func (suite *GetTestSuite) TestGetMinID() { + // Ask for 5 with a min ID somewhere in the middle of the stack. + minID := "01F8MHBQCBTDKN6X5VHGMMN4MA" + statuses, err := suite.timeline.Get(context.Background(), 5, "", "", minID, false) + if err != nil { + suite.FailNow(err.Error()) } - // sleep a second so the next query can run - time.Sleep(1 * time.Second) + suite.checkStatuses(statuses, id.Highest, minID, 5) + + // We're paging up so even the highest status ID in the pile + // shouldn't be the highest ID we have. + suite.NotEqual(suite.highestStatusID, statuses[0]) +} + +func (suite *GetTestSuite) TestGetMinIDOneOnly() { + // Ask for 1 with a min ID somewhere in the middle of the stack. + minID := "01F8MHBQCBTDKN6X5VHGMMN4MA" + statuses, err := suite.timeline.Get(context.Background(), 1, "", "", minID, false) + if err != nil { + suite.FailNow(err.Error()) + } + + suite.checkStatuses(statuses, id.Highest, minID, 1) + + // The one status we got back should have the an ID equal to the + // one ID immediately newer than it. + suite.Equal("01F8MHC0H0A7XHTVH5F596ZKBM", statuses[0].GetID()) +} + +func (suite *GetTestSuite) TestGetMinIDFromLowestInTestrig() { + // Ask for 1 with minID equal to the lowest status in the testrig. + minID := suite.lowestStatusID + statuses, err := suite.timeline.Get(context.Background(), 1, "", "", minID, false) + if err != nil { + suite.FailNow(err.Error()) + } + + suite.checkStatuses(statuses, id.Highest, minID, 1) + + // The one status we got back should have an id higher than + // the lowest status in the testrig, since minID is not inclusive. + suite.Greater(statuses[0].GetID(), suite.lowestStatusID) +} + +func (suite *GetTestSuite) TestGetMinIDFromLowestPossible() { + // Ask for 1 with the lowest possible min ID. + minID := id.Lowest + statuses, err := suite.timeline.Get(context.Background(), 1, "", "", minID, false) + if err != nil { + suite.FailNow(err.Error()) + } + + suite.checkStatuses(statuses, id.Highest, minID, 1) + + // The one status we got back should have the an ID equal to the + // lowest ID status in the test rig. + suite.Equal(suite.lowestStatusID, statuses[0].GetID()) } func (suite *GetTestSuite) TestGetBetweenID() { - // ask for 10 between these two IDs - statuses, err := suite.timeline.Get(context.Background(), 10, "01F8MHCP5P2NWYQ416SBA0XSEV", "", "01F8MHBQCBTDKN6X5VHGMMN4MA", false) + // Ask for 10 between these two IDs + maxID := "01F8MHCP5P2NWYQ416SBA0XSEV" + minID := "01F8MHBQCBTDKN6X5VHGMMN4MA" + + statuses, err := suite.timeline.Get(context.Background(), 10, maxID, "", minID, false) if err != nil { suite.FailNow(err.Error()) } - // we should only get 2 statuses back, since there are only two statuses between the given IDs - suite.Len(statuses, 2) - - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - } + // There's only two statuses between these two IDs. + suite.checkStatuses(statuses, maxID, minID, 2) } -func (suite *GetTestSuite) TestGetBetweenIDPrepareNext() { - // ask for 10 between these two IDs - statuses, err := suite.timeline.Get(context.Background(), 10, "01F8MHCP5P2NWYQ416SBA0XSEV", "", "01F8MHBQCBTDKN6X5VHGMMN4MA", true) +func (suite *GetTestSuite) TestGetBetweenIDImpossible() { + // Ask for 10 between these two IDs which present + // an impossible query. + maxID := id.Lowest + minID := id.Highest + + statuses, err := suite.timeline.Get(context.Background(), 10, maxID, "", minID, false) if err != nil { suite.FailNow(err.Error()) } - // we should only get 2 statuses back, since there are only two statuses between the given IDs - suite.Len(statuses, 2) + // We should have nothing back. + suite.checkStatuses(statuses, maxID, minID, 0) +} - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } +func (suite *GetTestSuite) TestLastGot() { + // LastGot should be zero + suite.Zero(suite.timeline.LastGot()) + + // Get some from the top + _, err := suite.timeline.Get(context.Background(), 10, "", "", "", false) + if err != nil { + suite.FailNow(err.Error()) } - // sleep a second so the next query can run - time.Sleep(1 * time.Second) + // LastGot should be updated + suite.WithinDuration(time.Now(), suite.timeline.LastGot(), 1*time.Second) } func TestGetTestSuite(t *testing.T) { diff --git a/internal/timeline/index.go b/internal/timeline/index.go index 3ab8dbeb9..a45617134 100644 --- a/internal/timeline/index.go +++ b/internal/timeline/index.go @@ -24,103 +24,205 @@ import ( "fmt" "codeberg.org/gruf/go-kv" + "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/log" ) -func (t *timeline) ItemIndexLength(ctx context.Context) int { - if t.indexedItems == nil || t.indexedItems.data == nil { - return 0 - } - return t.indexedItems.data.Len() -} +func (t *timeline) indexXBetweenIDs(ctx context.Context, amount int, behindID string, beforeID string, frontToBack bool) error { + l := log. + WithContext(ctx). + WithFields(kv.Fields{ + {"amount", amount}, + {"behindID", behindID}, + {"beforeID", beforeID}, + {"frontToBack", frontToBack}, + }...) + l.Trace("entering indexXBetweenIDs") -func (t *timeline) indexBehind(ctx context.Context, itemID string, amount int) error { - l := log.WithContext(ctx). - WithFields(kv.Fields{{"amount", amount}}...) - - // lazily initialize index if it hasn't been done already - if t.indexedItems.data == nil { - t.indexedItems.data = &list.List{} - t.indexedItems.data.Init() - } - - // If we're already indexedBehind given itemID by the required amount, we can return nil. - // First find position of itemID (or as near as possible). - var position int -positionLoop: - for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*indexedItemsEntry) - if !ok { - return errors.New("indexBehind: could not parse e as an itemIndexEntry") - } - - if entry.itemID <= itemID { - // we've found it - break positionLoop - } - position++ - } - - // now check if the length of indexed items exceeds the amount of items required (position of itemID, plus amount of posts requested after that) - if t.indexedItems.data.Len() > position+amount { - // we have enough indexed behind already to satisfy amount, so don't need to make db calls - l.Trace("returning nil since we already have enough items indexed") + if beforeID >= behindID { + // This is an impossible situation, we + // can't index anything between these. return nil } - toIndex := []Timelineable{} - offsetID := itemID + t.Lock() + defer t.Unlock() - l.Trace("entering grabloop") -grabloop: - for i := 0; len(toIndex) < amount && i < 5; i++ { // try the grabloop 5 times only - // first grab items using the caller-provided grab function - l.Trace("grabbing...") - items, stop, err := t.grabFunction(ctx, t.accountID, offsetID, "", "", amount) - if err != nil { - return err - } - if stop { - break grabloop + // Lazily init indexed items. + if t.items.data == nil { + t.items.data = &list.List{} + t.items.data.Init() + } + + // Start by mapping out the list so we know what + // we have to do. Depending on the current state + // of the list we might not have to do *anything*. + var ( + position int + listLen = t.items.data.Len() + behindIDPosition int + beforeIDPosition int + ) + + for e := t.items.data.Front(); e != nil; e = e.Next() { + entry := e.Value.(*indexedItemsEntry) //nolint:forcetypeassert + + position++ + + if entry.itemID > behindID { + l.Trace("item is too new, continuing") + continue } - l.Trace("filtering...") - // now filter each item using the caller-provided filter function - for _, item := range items { - shouldIndex, err := t.filterFunction(ctx, t.accountID, item) - if err != nil { - return err - } - if shouldIndex { - toIndex = append(toIndex, item) - } - offsetID = item.GetID() + if behindIDPosition == 0 { + // Gone far enough through the list + // and found our behindID mark. + // We only need to set this once. + l.Tracef("found behindID mark %s at position %d", entry.itemID, position) + behindIDPosition = position + } + + if entry.itemID >= beforeID { + // Push the beforeID mark back + // one place every iteration. + l.Tracef("setting beforeID mark %s at position %d", entry.itemID, position) + beforeIDPosition = position + } + + if entry.itemID <= beforeID { + // We've gone beyond the bounds of + // items we're interested in; stop. + l.Trace("reached older items, breaking") + break } } - l.Trace("left grabloop") - // index the items we got - for _, s := range toIndex { - if _, err := t.IndexOne(ctx, s.GetID(), s.GetBoostOfID(), s.GetAccountID(), s.GetBoostOfAccountID()); err != nil { - return fmt.Errorf("indexBehind: error indexing item with id %s: %s", s.GetID(), err) + // We can now figure out if we need to make db calls. + var grabMore bool + switch { + case listLen < amount: + // The whole list is shorter than the + // amount we're being asked to return, + // make up the difference. + grabMore = true + amount -= listLen + case beforeIDPosition-behindIDPosition < amount: + // Not enough items between behindID and + // beforeID to return amount required, + // try to get more. + grabMore = true + } + + if !grabMore { + // We're good! + return nil + } + + // Fetch additional items. + items, err := t.grab(ctx, amount, behindID, beforeID, frontToBack) + if err != nil { + return err + } + + // Index all the items we got. We already have + // a lock on the timeline, so don't call IndexOne + // here, since that will also try to get a lock! + for _, item := range items { + entry := &indexedItemsEntry{ + itemID: item.GetID(), + boostOfID: item.GetBoostOfID(), + accountID: item.GetAccountID(), + boostOfAccountID: item.GetBoostOfAccountID(), + } + + if _, err := t.items.insertIndexed(ctx, entry); err != nil { + return fmt.Errorf("error inserting entry with itemID %s into index: %w", entry.itemID, err) } } return nil } -func (t *timeline) IndexOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) { - t.Lock() - defer t.Unlock() +// grab wraps the timeline's grabFunction in paging + filtering logic. +func (t *timeline) grab(ctx context.Context, amount int, behindID string, beforeID string, frontToBack bool) ([]Timelineable, error) { + var ( + sinceID string + minID string + grabbed int + maxID = behindID + filtered = make([]Timelineable, 0, amount) + ) - postIndexEntry := &indexedItemsEntry{ - itemID: itemID, - boostOfID: boostOfID, - accountID: accountID, - boostOfAccountID: boostOfAccountID, + if frontToBack { + sinceID = beforeID + } else { + minID = beforeID } - return t.indexedItems.insertIndexed(ctx, postIndexEntry) + for attempts := 0; attempts < 5; attempts++ { + if grabbed >= amount { + // We got everything we needed. + break + } + + items, stop, err := t.grabFunction( + ctx, + t.accountID, + maxID, + sinceID, + minID, + // Don't grab more than we need to. + amount-grabbed, + ) + + if err != nil { + // Grab function already checks for + // db.ErrNoEntries, so if an error + // is returned then it's a real one. + return nil, err + } + + if stop || len(items) == 0 { + // No items left. + break + } + + // Set next query parameters. + if frontToBack { + // Page down. + maxID = items[len(items)-1].GetID() + if maxID <= beforeID { + // Can't go any further. + break + } + } else { + // Page up. + minID = items[0].GetID() + if minID >= behindID { + // Can't go any further. + break + } + } + + for _, item := range items { + ok, err := t.filterFunction(ctx, t.accountID, item) + if err != nil { + if !errors.Is(err, db.ErrNoEntries) { + // Real error here. + return nil, err + } + log.Warnf(ctx, "errNoEntries while filtering item %s: %s", item.GetID(), err) + continue + } + + if ok { + filtered = append(filtered, item) + grabbed++ // count this as grabbed + } + } + } + + return filtered, nil } func (t *timeline) IndexAndPrepareOne(ctx context.Context, statusID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) { @@ -134,46 +236,49 @@ func (t *timeline) IndexAndPrepareOne(ctx context.Context, statusID string, boos boostOfAccountID: boostOfAccountID, } - inserted, err := t.indexedItems.insertIndexed(ctx, postIndexEntry) + if inserted, err := t.items.insertIndexed(ctx, postIndexEntry); err != nil { + return false, fmt.Errorf("IndexAndPrepareOne: error inserting indexed: %w", err) + } else if !inserted { + // Entry wasn't inserted, so + // don't bother preparing it. + return false, nil + } + + preparable, err := t.prepareFunction(ctx, t.accountID, statusID) if err != nil { - return inserted, fmt.Errorf("IndexAndPrepareOne: error inserting indexed: %s", err) + return true, fmt.Errorf("IndexAndPrepareOne: error preparing: %w", err) } + postIndexEntry.prepared = preparable - if inserted { - if err := t.prepare(ctx, statusID); err != nil { - return inserted, fmt.Errorf("IndexAndPrepareOne: error preparing: %s", err) - } - } - - return inserted, nil + return true, nil } -func (t *timeline) OldestIndexedItemID(ctx context.Context) (string, error) { - var id string - if t.indexedItems == nil || t.indexedItems.data == nil || t.indexedItems.data.Back() == nil { - // return an empty string if postindex hasn't been initialized yet - return id, nil +func (t *timeline) Len() int { + t.Lock() + defer t.Unlock() + + if t.items == nil || t.items.data == nil { + // indexedItems hasnt been initialized yet. + return 0 } - e := t.indexedItems.data.Back() - entry, ok := e.Value.(*indexedItemsEntry) - if !ok { - return id, errors.New("OldestIndexedItemID: could not parse e as itemIndexEntry") - } - return entry.itemID, nil + return t.items.data.Len() } -func (t *timeline) NewestIndexedItemID(ctx context.Context) (string, error) { - var id string - if t.indexedItems == nil || t.indexedItems.data == nil || t.indexedItems.data.Front() == nil { - // return an empty string if postindex hasn't been initialized yet - return id, nil +func (t *timeline) OldestIndexedItemID() string { + t.Lock() + defer t.Unlock() + + if t.items == nil || t.items.data == nil { + // indexedItems hasnt been initialized yet. + return "" } - e := t.indexedItems.data.Front() - entry, ok := e.Value.(*indexedItemsEntry) - if !ok { - return id, errors.New("NewestIndexedItemID: could not parse e as itemIndexEntry") + e := t.items.data.Back() + if e == nil { + // List was empty. + return "" } - return entry.itemID, nil + + return e.Value.(*indexedItemsEntry).itemID //nolint:forcetypeassert } diff --git a/internal/timeline/index_test.go b/internal/timeline/index_test.go index d2d1741f6..76b161171 100644 --- a/internal/timeline/index_test.go +++ b/internal/timeline/index_test.go @@ -52,7 +52,7 @@ func (suite *IndexTestSuite) SetupTest() { testrig.StandardDBSetup(suite.db, nil) // let's take local_account_1 as the timeline owner, and start with an empty timeline - tl, err := timeline.NewTimeline( + suite.timeline = timeline.NewTimeline( context.Background(), suite.testAccounts["local_account_1"].ID, processing.StatusGrabFunction(suite.db), @@ -60,10 +60,6 @@ func (suite *IndexTestSuite) SetupTest() { processing.StatusPrepareFunction(suite.db, suite.tc), processing.StatusSkipInsertFunction(), ) - if err != nil { - suite.FailNow(err.Error()) - } - suite.timeline = tl } func (suite *IndexTestSuite) TearDownTest() { @@ -72,12 +68,11 @@ func (suite *IndexTestSuite) TearDownTest() { func (suite *IndexTestSuite) TestOldestIndexedItemIDEmpty() { // the oldest indexed post should be an empty string since there's nothing indexed yet - postID, err := suite.timeline.OldestIndexedItemID(context.Background()) - suite.NoError(err) + postID := suite.timeline.OldestIndexedItemID() suite.Empty(postID) // indexLength should be 0 - indexLength := suite.timeline.ItemIndexLength(context.Background()) + indexLength := suite.timeline.Len() suite.Equal(0, indexLength) } @@ -85,12 +80,12 @@ func (suite *IndexTestSuite) TestIndexAlreadyIndexed() { testStatus := suite.testStatuses["local_account_1_status_1"] // index one post -- it should be indexed - indexed, err := suite.timeline.IndexOne(context.Background(), testStatus.ID, testStatus.BoostOfID, testStatus.AccountID, testStatus.BoostOfAccountID) + indexed, err := suite.timeline.IndexAndPrepareOne(context.Background(), testStatus.ID, testStatus.BoostOfID, testStatus.AccountID, testStatus.BoostOfAccountID) suite.NoError(err) suite.True(indexed) // try to index the same post again -- it should not be indexed - indexed, err = suite.timeline.IndexOne(context.Background(), testStatus.ID, testStatus.BoostOfID, testStatus.AccountID, testStatus.BoostOfAccountID) + indexed, err = suite.timeline.IndexAndPrepareOne(context.Background(), testStatus.ID, testStatus.BoostOfID, testStatus.AccountID, testStatus.BoostOfAccountID) suite.NoError(err) suite.False(indexed) } @@ -120,12 +115,12 @@ func (suite *IndexTestSuite) TestIndexBoostOfAlreadyIndexed() { } // index one post -- it should be indexed - indexed, err := suite.timeline.IndexOne(context.Background(), testStatus.ID, testStatus.BoostOfID, testStatus.AccountID, testStatus.BoostOfAccountID) + indexed, err := suite.timeline.IndexAndPrepareOne(context.Background(), testStatus.ID, testStatus.BoostOfID, testStatus.AccountID, testStatus.BoostOfAccountID) suite.NoError(err) suite.True(indexed) // try to index the a boost of that post -- it should not be indexed - indexed, err = suite.timeline.IndexOne(context.Background(), boostOfTestStatus.ID, boostOfTestStatus.BoostOfID, boostOfTestStatus.AccountID, boostOfTestStatus.BoostOfAccountID) + indexed, err = suite.timeline.IndexAndPrepareOne(context.Background(), boostOfTestStatus.ID, boostOfTestStatus.BoostOfID, boostOfTestStatus.AccountID, boostOfTestStatus.BoostOfAccountID) suite.NoError(err) suite.False(indexed) } diff --git a/internal/timeline/indexeditems.go b/internal/timeline/indexeditems.go index 07a23582e..ff36a66d2 100644 --- a/internal/timeline/indexeditems.go +++ b/internal/timeline/indexeditems.go @@ -20,7 +20,7 @@ package timeline import ( "container/list" "context" - "errors" + "fmt" ) type indexedItems struct { @@ -33,53 +33,87 @@ type indexedItemsEntry struct { boostOfID string accountID string boostOfAccountID string + prepared Preparable } +// WARNING: ONLY CALL THIS FUNCTION IF YOU ALREADY HAVE +// A LOCK ON THE TIMELINE CONTAINING THIS INDEXEDITEMS! func (i *indexedItems) insertIndexed(ctx context.Context, newEntry *indexedItemsEntry) (bool, error) { + // Lazily init indexed items. if i.data == nil { i.data = &list.List{} + i.data.Init() } - // if we have no entries yet, this is both the newest and oldest entry, so just put it in the front if i.data.Len() == 0 { + // We have no entries yet, meaning this is both the + // newest + oldest entry, so just put it in the front. i.data.PushFront(newEntry) return true, nil } - var insertMark *list.Element - var position int - // We need to iterate through the index to make sure we put this item in the appropriate place according to when it was created. - // We also need to make sure we're not inserting a duplicate item -- this can happen sometimes and it's not nice UX (*shudder*). + var ( + insertMark *list.Element + currentPosition int + ) + + // We need to iterate through the index to make sure we put + // this item in the appropriate place according to its id. + // We also need to make sure we're not inserting a duplicate + // item -- this can happen sometimes and it's sucky UX. for e := i.data.Front(); e != nil; e = e.Next() { - position++ + currentPosition++ - entry, ok := e.Value.(*indexedItemsEntry) - if !ok { - return false, errors.New("insertIndexed: could not parse e as an indexedItemsEntry") - } + currentEntry := e.Value.(*indexedItemsEntry) //nolint:forcetypeassert - skip, err := i.skipInsert(ctx, newEntry.itemID, newEntry.accountID, newEntry.boostOfID, newEntry.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position) - if err != nil { - return false, err - } - if skip { + // Check if we need to skip inserting this item based on + // the current item. + // + // For example, if the new item is a boost, and the current + // item is the original, we may not want to insert the boost + // if it would appear very shortly after the original. + if skip, err := i.skipInsert( + ctx, + newEntry.itemID, + newEntry.accountID, + newEntry.boostOfID, + newEntry.boostOfAccountID, + currentEntry.itemID, + currentEntry.accountID, + currentEntry.boostOfID, + currentEntry.boostOfAccountID, + currentPosition, + ); err != nil { + return false, fmt.Errorf("insertIndexed: error calling skipInsert: %w", err) + } else if skip { + // We don't need to insert this at all, + // so we can safely bail. return false, nil } - // if the item to index is newer than e, insert it before e in the list - if insertMark == nil { - if newEntry.itemID > entry.itemID { - insertMark = e - } + if insertMark != nil { + // We already found our mark. + continue } + + if currentEntry.itemID > newEntry.itemID { + // We're still in items newer than + // the one we're trying to insert. + continue + } + + // We found our spot! + insertMark = e } - if insertMark != nil { - i.data.InsertBefore(newEntry, insertMark) + if insertMark == nil { + // We looked through the whole timeline and didn't find + // a mark, so the new item is the oldest item we've seen; + // insert it at the back. + i.data.PushBack(newEntry) return true, nil } - // if we reach this point it's the oldest item we've seen so put it at the back - i.data.PushBack(newEntry) + i.data.InsertBefore(newEntry, insertMark) return true, nil } diff --git a/internal/timeline/manager.go b/internal/timeline/manager.go index c3b6b8b0f..f34cee787 100644 --- a/internal/timeline/manager.go +++ b/internal/timeline/manager.go @@ -20,14 +20,18 @@ package timeline import ( "context" "fmt" - "strings" "sync" "time" - "codeberg.org/gruf/go-kv" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/log" ) +const ( + pruneLengthIndexed = 400 + pruneLengthPrepared = 50 +) + // Manager abstracts functions for creating timelines for multiple accounts, and adding, removing, and fetching entries from those timelines. // // By the time a timelineable hits the manager interface, it should already have been filtered and it should be established that the item indeed @@ -41,38 +45,37 @@ import ( // Prepared items consist of the item's database ID, the time it was created, AND the apimodel representation of that item, for quick serialization. // Prepared items of course take up more memory than indexed items, so they should be regularly pruned if they're not being actively served. type Manager interface { - // Ingest takes one item and indexes it into the timeline for the given account ID. - // - // It should already be established before calling this function that the item actually belongs in the timeline! - // - // The returned bool indicates whether the item was actually put in the timeline. This could be false in cases where - // the item is a boosted status, but a boost of the original status or the status itself already exists recently in the timeline. - Ingest(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) - // IngestAndPrepare takes one timelineable and indexes it into the timeline for the given account ID, and then immediately prepares it for serving. + // IngestOne takes one timelineable and indexes it into the timeline for the given account ID, and then immediately prepares it for serving. // This is useful in cases where we know the item will need to be shown at the top of a user's timeline immediately (eg., a new status is created). // // It should already be established before calling this function that the item actually belongs in the timeline! // // The returned bool indicates whether the item was actually put in the timeline. This could be false in cases where // a status is a boost, but a boost of the original status or the status itself already exists recently in the timeline. - IngestAndPrepare(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) + IngestOne(ctx context.Context, accountID string, item Timelineable) (bool, error) + // GetTimeline returns limit n amount of prepared entries from the timeline of the given account ID, in descending chronological order. - // If maxID is provided, it will return prepared entries from that maxID onwards, inclusive. GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) - // GetIndexedLength returns the amount of items that have been *indexed* for the given account ID. - GetIndexedLength(ctx context.Context, timelineAccountID string) int + + // GetIndexedLength returns the amount of items that have been indexed for the given account ID. + GetIndexedLength(ctx context.Context, accountID string) int + // GetOldestIndexedID returns the id ID for the oldest item that we have indexed for the given account. - GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error) - // PrepareXFromTop prepares limit n amount of items, based on their indexed representations, from the top of the index. - PrepareXFromTop(ctx context.Context, timelineAccountID string, limit int) error + // Will be an empty string if nothing is (yet) indexed. + GetOldestIndexedID(ctx context.Context, accountID string) string + // Remove removes one item from the timeline of the given timelineAccountID - Remove(ctx context.Context, timelineAccountID string, itemID string) (int, error) + Remove(ctx context.Context, accountID string, itemID string) (int, error) + // WipeItemFromAllTimelines removes one item from the index and prepared items of all timelines WipeItemFromAllTimelines(ctx context.Context, itemID string) error + // WipeStatusesFromAccountID removes all items by the given accountID from the timelineAccountID's timelines. WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error + // Start starts hourly cleanup jobs for this timeline manager. Start() error + // Stop stops the timeline manager (currently a stub, doesn't do anything). Stop() error } @@ -97,31 +100,41 @@ type manager struct { } func (m *manager) Start() error { - // range through all timelines in the sync map once per hour + prune as necessary + // Start a background goroutine which iterates + // through all stored timelines once per hour, + // and cleans up old entries if that timeline + // hasn't been accessed in the last hour. go func() { for now := range time.NewTicker(1 * time.Hour).C { - m.accountTimelines.Range(func(key any, value any) bool { - timelineAccountID, ok := key.(string) + // Define the range function inside here, + // so that we can use the 'now' returned + // by the ticker, instead of having to call + // time.Now() multiple times. + // + // Unless it panics, this function always + // returns 'true', to continue the Range + // call through the sync.Map. + f := func(_ any, v any) bool { + timeline, ok := v.(Timeline) if !ok { - panic("couldn't parse timeline manager sync map key as string, this should never happen so panic") + log.Panic(nil, "couldn't parse timeline manager sync map value as Timeline, this should never happen so panic") } - t, ok := value.(Timeline) - if !ok { - panic("couldn't parse timeline manager sync map value as Timeline, this should never happen so panic") + if now.Sub(timeline.LastGot()) < 1*time.Hour { + // Timeline has been fetched in the + // last hour, move on to the next one. + return true } - anHourAgo := now.Add(-1 * time.Hour) - if lastGot := t.LastGot(); lastGot.Before(anHourAgo) { - amountPruned := t.Prune(defaultDesiredPreparedItemsLength, defaultDesiredIndexedItemsLength) - log.WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, - {"amountPruned", amountPruned}, - }...).Info("pruned indexed and prepared items from timeline") + if amountPruned := timeline.Prune(pruneLengthPrepared, pruneLengthIndexed); amountPruned > 0 { + log.WithField("accountID", timeline.AccountID()).Infof("pruned %d indexed and prepared items from timeline", amountPruned) } return true - }) + } + + // Execute the function for each timeline. + m.accountTimelines.Range(f) } }() @@ -132,146 +145,69 @@ func (m *manager) Stop() error { return nil } -func (m *manager) Ingest(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) { - l := log.WithContext(ctx). - WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, - {"itemID", item.GetID()}, - }...) - - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return false, err - } - - l.Trace("ingesting item") - return t.IndexOne(ctx, item.GetID(), item.GetBoostOfID(), item.GetAccountID(), item.GetBoostOfAccountID()) +func (m *manager) IngestOne(ctx context.Context, accountID string, item Timelineable) (bool, error) { + return m.getOrCreateTimeline(ctx, accountID).IndexAndPrepareOne( + ctx, + item.GetID(), + item.GetBoostOfID(), + item.GetAccountID(), + item.GetBoostOfAccountID(), + ) } -func (m *manager) IngestAndPrepare(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) { - l := log.WithContext(ctx). - WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, - {"itemID", item.GetID()}, - }...) - - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return false, err - } - - l.Trace("ingesting item") - return t.IndexAndPrepareOne(ctx, item.GetID(), item.GetBoostOfID(), item.GetAccountID(), item.GetBoostOfAccountID()) +func (m *manager) Remove(ctx context.Context, accountID string, itemID string) (int, error) { + return m.getOrCreateTimeline(ctx, accountID).Remove(ctx, itemID) } -func (m *manager) Remove(ctx context.Context, timelineAccountID string, itemID string) (int, error) { - l := log.WithContext(ctx). - WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, - {"itemID", itemID}, - }...) - - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return 0, err - } - - l.Trace("removing item") - return t.Remove(ctx, itemID) +func (m *manager) GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) { + return m.getOrCreateTimeline(ctx, accountID).Get(ctx, limit, maxID, sinceID, minID, true) } -func (m *manager) GetTimeline(ctx context.Context, timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) { - l := log.WithContext(ctx). - WithFields(kv.Fields{{"timelineAccountID", timelineAccountID}}...) - - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return nil, err - } - - items, err := t.Get(ctx, limit, maxID, sinceID, minID, true) - if err != nil { - l.Errorf("error getting statuses: %s", err) - } - return items, nil +func (m *manager) GetIndexedLength(ctx context.Context, accountID string) int { + return m.getOrCreateTimeline(ctx, accountID).Len() } -func (m *manager) GetIndexedLength(ctx context.Context, timelineAccountID string) int { - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return 0 - } - - return t.ItemIndexLength(ctx) -} - -func (m *manager) GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error) { - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return "", err - } - - return t.OldestIndexedItemID(ctx) -} - -func (m *manager) PrepareXFromTop(ctx context.Context, timelineAccountID string, limit int) error { - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return err - } - - return t.PrepareFromTop(ctx, limit) +func (m *manager) GetOldestIndexedID(ctx context.Context, accountID string) string { + return m.getOrCreateTimeline(ctx, accountID).OldestIndexedItemID() } func (m *manager) WipeItemFromAllTimelines(ctx context.Context, statusID string) error { - errors := []string{} - m.accountTimelines.Range(func(k interface{}, i interface{}) bool { - t, ok := i.(Timeline) - if !ok { - panic("couldn't parse entry as Timeline, this should never happen so panic") + errors := gtserror.MultiError{} + + m.accountTimelines.Range(func(_ any, v any) bool { + if _, err := v.(Timeline).Remove(ctx, statusID); err != nil { + errors.Append(err) } - if _, err := t.Remove(ctx, statusID); err != nil { - errors = append(errors, err.Error()) - } - - return true + return true // always continue range }) - var err error if len(errors) > 0 { - err = fmt.Errorf("one or more errors removing status %s from all timelines: %s", statusID, strings.Join(errors, ";")) + return fmt.Errorf("WipeItemFromAllTimelines: one or more errors wiping status %s: %w", statusID, errors.Combine()) } - return err + return nil } func (m *manager) WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error { - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return err - } - - _, err = t.RemoveAllBy(ctx, accountID) + _, err := m.getOrCreateTimeline(ctx, timelineAccountID).RemoveAllByOrBoosting(ctx, accountID) return err } -func (m *manager) getOrCreateTimeline(ctx context.Context, timelineAccountID string) (Timeline, error) { - var t Timeline - i, ok := m.accountTimelines.Load(timelineAccountID) - if !ok { - var err error - t, err = NewTimeline(ctx, timelineAccountID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction) - if err != nil { - return nil, err - } - m.accountTimelines.Store(timelineAccountID, t) - } else { - t, ok = i.(Timeline) - if !ok { - panic("couldn't parse entry as Timeline, this should never happen so panic") - } +// getOrCreateTimeline returns a timeline for the given +// accountID. If a timeline does not yet exist in the +// manager's sync.Map, it will be created and stored. +func (m *manager) getOrCreateTimeline(ctx context.Context, accountID string) Timeline { + i, ok := m.accountTimelines.Load(accountID) + if ok { + // Timeline already existed in sync.Map. + return i.(Timeline) //nolint:forcetypeassert } - return t, nil + // Timeline did not yet exist in sync.Map. + // Create + store it. + timeline := NewTimeline(ctx, accountID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction) + m.accountTimelines.Store(accountID, timeline) + + return timeline } diff --git a/internal/timeline/manager_test.go b/internal/timeline/manager_test.go index 8fc4984d1..cf1f5be2b 100644 --- a/internal/timeline/manager_test.go +++ b/internal/timeline/manager_test.go @@ -72,23 +72,9 @@ func (suite *ManagerTestSuite) TestManagerIntegration() { suite.Equal(0, indexedLen) // oldestIndexed should be empty string since there's nothing indexed - oldestIndexed, err := suite.manager.GetOldestIndexedID(ctx, testAccount.ID) - suite.NoError(err) + oldestIndexed := suite.manager.GetOldestIndexedID(ctx, testAccount.ID) suite.Empty(oldestIndexed) - // trigger status preparation - err = suite.manager.PrepareXFromTop(ctx, testAccount.ID, 20) - suite.NoError(err) - - // local_account_1 can see 16 statuses out of the testrig statuses in its home timeline - indexedLen = suite.manager.GetIndexedLength(ctx, testAccount.ID) - suite.Equal(16, indexedLen) - - // oldest should now be set - oldestIndexed, err = suite.manager.GetOldestIndexedID(ctx, testAccount.ID) - suite.NoError(err) - suite.Equal("01F8MH75CBF9JFX4ZAD54N0W0R", oldestIndexed) - // get hometimeline statuses, err := suite.manager.GetTimeline(ctx, testAccount.ID, "", "", "", 20, false) suite.NoError(err) @@ -103,22 +89,20 @@ func (suite *ManagerTestSuite) TestManagerIntegration() { suite.Equal(15, indexedLen) // oldest should now be different - oldestIndexed, err = suite.manager.GetOldestIndexedID(ctx, testAccount.ID) - suite.NoError(err) + oldestIndexed = suite.manager.GetOldestIndexedID(ctx, testAccount.ID) suite.Equal("01F8MH82FYRXD2RC6108DAJ5HB", oldestIndexed) // delete the new oldest status specifically from this timeline, as though local_account_1 had muted or blocked it removed, err := suite.manager.Remove(ctx, testAccount.ID, "01F8MH82FYRXD2RC6108DAJ5HB") suite.NoError(err) - suite.Equal(2, removed) // 1 status should be removed, but from both indexed and prepared, so 2 removals total + suite.Equal(1, removed) // 1 status should be removed // timeline should be shorter indexedLen = suite.manager.GetIndexedLength(ctx, testAccount.ID) suite.Equal(14, indexedLen) // oldest should now be different - oldestIndexed, err = suite.manager.GetOldestIndexedID(ctx, testAccount.ID) - suite.NoError(err) + oldestIndexed = suite.manager.GetOldestIndexedID(ctx, testAccount.ID) suite.Equal("01F8MHAAY43M6RJ473VQFCVH37", oldestIndexed) // now remove all entries by local_account_2 from the timeline @@ -129,24 +113,18 @@ func (suite *ManagerTestSuite) TestManagerIntegration() { indexedLen = suite.manager.GetIndexedLength(ctx, testAccount.ID) suite.Equal(7, indexedLen) - // ingest 1 into the timeline - status1 := suite.testStatuses["admin_account_status_1"] - ingested, err := suite.manager.Ingest(ctx, status1, testAccount.ID) - suite.NoError(err) - suite.True(ingested) - // ingest and prepare another one into the timeline - status2 := suite.testStatuses["local_account_2_status_1"] - ingested, err = suite.manager.IngestAndPrepare(ctx, status2, testAccount.ID) + status := suite.testStatuses["local_account_2_status_1"] + ingested, err := suite.manager.IngestOne(ctx, testAccount.ID, status) suite.NoError(err) suite.True(ingested) // timeline should be longer now indexedLen = suite.manager.GetIndexedLength(ctx, testAccount.ID) - suite.Equal(9, indexedLen) + suite.Equal(8, indexedLen) - // try to ingest status 2 again - ingested, err = suite.manager.IngestAndPrepare(ctx, status2, testAccount.ID) + // try to ingest same status again + ingested, err = suite.manager.IngestOne(ctx, testAccount.ID, status) suite.NoError(err) suite.False(ingested) // should be false since it's a duplicate } diff --git a/internal/timeline/preparable.go b/internal/timeline/preparable.go deleted file mode 100644 index 510b5c231..000000000 --- a/internal/timeline/preparable.go +++ /dev/null @@ -1,25 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package timeline - -type Preparable interface { - GetID() string - GetAccountID() string - GetBoostOfID() string - GetBoostOfAccountID() string -} diff --git a/internal/timeline/prepare.go b/internal/timeline/prepare.go index a53aed78c..cc014037b 100644 --- a/internal/timeline/prepare.go +++ b/internal/timeline/prepare.go @@ -25,240 +25,114 @@ import ( "codeberg.org/gruf/go-kv" "github.com/superseriousbusiness/gotosocial/internal/db" - "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" ) -func (t *timeline) PrepareFromTop(ctx context.Context, amount int) error { - l := log.WithContext(ctx). - WithFields(kv.Fields{{"amount", amount}}...) - - // lazily initialize prepared posts if it hasn't been done already - if t.preparedItems.data == nil { - t.preparedItems.data = &list.List{} - t.preparedItems.data.Init() - } - - // if the postindex is nil, nothing has been indexed yet so index from the highest ID possible - if t.indexedItems.data == nil { - l.Debug("postindex.data was nil, indexing behind highest possible ID") - if err := t.indexBehind(ctx, id.Highest, amount); err != nil { - return fmt.Errorf("PrepareFromTop: error indexing behind id %s: %s", id.Highest, err) - } - } - - l.Trace("entering prepareloop") - t.Lock() - defer t.Unlock() - var prepared int -prepareloop: - for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { - if e == nil { - continue - } - - entry, ok := e.Value.(*indexedItemsEntry) - if !ok { - return errors.New("PrepareFromTop: could not parse e as a postIndexEntry") - } - - if err := t.prepare(ctx, entry.itemID); err != nil { - // there's been an error - if err != db.ErrNoEntries { - // it's a real error - return fmt.Errorf("PrepareFromTop: error preparing status with id %s: %s", entry.itemID, err) - } - // the status just doesn't exist (anymore) so continue to the next one - continue - } - - prepared++ - if prepared == amount { - // we're done - l.Trace("leaving prepareloop") - break prepareloop - } - } - - l.Trace("leaving function") - return nil -} - -func (t *timeline) prepareNextQuery(ctx context.Context, amount int, maxID string, sinceID string, minID string) error { - l := log.WithContext(ctx). +func (t *timeline) prepareXBetweenIDs(ctx context.Context, amount int, behindID string, beforeID string, frontToBack bool) error { + l := log. + WithContext(ctx). WithFields(kv.Fields{ {"amount", amount}, - {"maxID", maxID}, - {"sinceID", sinceID}, - {"minID", minID}, + {"behindID", behindID}, + {"beforeID", beforeID}, + {"frontToBack", frontToBack}, }...) + l.Trace("entering prepareXBetweenIDs") - var err error - switch { - case maxID != "" && sinceID == "": - l.Debug("preparing behind maxID") - err = t.prepareBehind(ctx, maxID, amount) - case maxID == "" && sinceID != "": - l.Debug("preparing before sinceID") - err = t.prepareBefore(ctx, sinceID, false, amount) - case maxID == "" && minID != "": - l.Debug("preparing before minID") - err = t.prepareBefore(ctx, minID, false, amount) - } - - return err -} - -// prepareBehind instructs the timeline to prepare the next amount of entries for serialization, from position onwards. -// If include is true, then the given item ID will also be prepared, otherwise only entries behind it will be prepared. -func (t *timeline) prepareBehind(ctx context.Context, itemID string, amount int) error { - // lazily initialize prepared items if it hasn't been done already - if t.preparedItems.data == nil { - t.preparedItems.data = &list.List{} - t.preparedItems.data.Init() - } - - if err := t.indexBehind(ctx, itemID, amount); err != nil { - return fmt.Errorf("prepareBehind: error indexing behind id %s: %s", itemID, err) - } - - // if the itemindex is nil, nothing has been indexed yet so there's nothing to prepare - if t.indexedItems.data == nil { + if beforeID >= behindID { + // This is an impossible situation, we + // can't prepare anything between these. return nil } - var prepared int - var preparing bool + if err := t.indexXBetweenIDs(ctx, amount, behindID, beforeID, frontToBack); err != nil { + // An error here doesn't necessarily mean we + // can't prepare anything, so log + keep going. + l.Debugf("error calling prepareXBetweenIDs: %s", err) + } + t.Lock() defer t.Unlock() -prepareloop: - for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*indexedItemsEntry) - if !ok { - return errors.New("prepareBehind: could not parse e as itemIndexEntry") - } - if !preparing { - // we haven't hit the position we need to prepare from yet - if entry.itemID == itemID { - preparing = true - } - } + // Try to prepare everything between (and including) the two points. + var ( + toPrepare = make(map[*list.Element]*indexedItemsEntry) + foundToPrepare int + ) - if preparing { - if err := t.prepare(ctx, entry.itemID); err != nil { - // there's been an error - if err != db.ErrNoEntries { - // it's a real error - return fmt.Errorf("prepareBehind: error preparing item with id %s: %s", entry.itemID, err) - } - // the status just doesn't exist (anymore) so continue to the next one + if frontToBack { + // Paging forwards / down. + for e := t.items.data.Front(); e != nil; e = e.Next() { + entry := e.Value.(*indexedItemsEntry) //nolint:forcetypeassert + + if entry.itemID > behindID { + l.Trace("item is too new, continuing") continue } - if prepared == amount { - // we're done - break prepareloop + + if entry.itemID < beforeID { + // We've gone beyond the bounds of + // items we're interested in; stop. + l.Trace("reached older items, breaking") + break + } + + // Only prepare entry if it's not + // already prepared, save db calls. + if entry.prepared == nil { + toPrepare[e] = entry + } + + foundToPrepare++ + if foundToPrepare >= amount { + break } - prepared++ } + } else { + // Paging backwards / up. + for e := t.items.data.Back(); e != nil; e = e.Prev() { + entry := e.Value.(*indexedItemsEntry) //nolint:forcetypeassert + + if entry.itemID < beforeID { + l.Trace("item is too old, continuing") + continue + } + + if entry.itemID > behindID { + // We've gone beyond the bounds of + // items we're interested in; stop. + l.Trace("reached newer items, breaking") + break + } + + if entry.prepared == nil { + toPrepare[e] = entry + } + + // Only prepare entry if it's not + // already prepared, save db calls. + foundToPrepare++ + if foundToPrepare >= amount { + break + } + } + } + + for e, entry := range toPrepare { + prepared, err := t.prepareFunction(ctx, t.accountID, entry.itemID) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + // ErrNoEntries means something has been deleted, + // so we'll likely not be able to ever prepare this. + // This means we can remove it and skip past it. + l.Debugf("db.ErrNoEntries while trying to prepare %s; will remove from timeline", entry.itemID) + t.items.data.Remove(e) + } + // We've got a proper db error. + return fmt.Errorf("prepareXBetweenIDs: db error while trying to prepare %s: %w", entry.itemID, err) + } + entry.prepared = prepared } return nil } - -func (t *timeline) prepareBefore(ctx context.Context, statusID string, include bool, amount int) error { - t.Lock() - defer t.Unlock() - - // lazily initialize prepared posts if it hasn't been done already - if t.preparedItems.data == nil { - t.preparedItems.data = &list.List{} - t.preparedItems.data.Init() - } - - // if the postindex is nil, nothing has been indexed yet so there's nothing to prepare - if t.indexedItems.data == nil { - return nil - } - - var prepared int - var preparing bool -prepareloop: - for e := t.indexedItems.data.Back(); e != nil; e = e.Prev() { - entry, ok := e.Value.(*indexedItemsEntry) - if !ok { - return errors.New("prepareBefore: could not parse e as a postIndexEntry") - } - - if !preparing { - // we haven't hit the position we need to prepare from yet - if entry.itemID == statusID { - preparing = true - if !include { - continue - } - } - } - - if preparing { - if err := t.prepare(ctx, entry.itemID); err != nil { - // there's been an error - if err != db.ErrNoEntries { - // it's a real error - return fmt.Errorf("prepareBefore: error preparing status with id %s: %s", entry.itemID, err) - } - // the status just doesn't exist (anymore) so continue to the next one - continue - } - if prepared == amount { - // we're done - break prepareloop - } - prepared++ - } - } - - return nil -} - -func (t *timeline) prepare(ctx context.Context, itemID string) error { - // trigger the caller-provided prepare function - prepared, err := t.prepareFunction(ctx, t.accountID, itemID) - if err != nil { - return err - } - - // shove it in prepared items as a prepared items entry - preparedItemsEntry := &preparedItemsEntry{ - itemID: prepared.GetID(), - boostOfID: prepared.GetBoostOfID(), - accountID: prepared.GetAccountID(), - boostOfAccountID: prepared.GetBoostOfAccountID(), - prepared: prepared, - } - - return t.preparedItems.insertPrepared(ctx, preparedItemsEntry) -} - -// oldestPreparedItemID returns the id of the rearmost (ie., the oldest) prepared item, or an error if something goes wrong. -// If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this. -func (t *timeline) oldestPreparedItemID(ctx context.Context) (string, error) { - var id string - if t.preparedItems == nil || t.preparedItems.data == nil { - // return an empty string if prepared items hasn't been initialized yet - return id, nil - } - - e := t.preparedItems.data.Back() - if e == nil { - // return an empty string if there's no back entry (ie., the index list hasn't been initialized yet) - return id, nil - } - - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return id, errors.New("oldestPreparedItemID: could not parse e as a preparedItemsEntry") - } - - return entry.itemID, nil -} diff --git a/internal/timeline/prepareditems.go b/internal/timeline/prepareditems.go deleted file mode 100644 index 86bb6ad69..000000000 --- a/internal/timeline/prepareditems.go +++ /dev/null @@ -1,91 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package timeline - -import ( - "container/list" - "context" - "errors" -) - -type preparedItems struct { - data *list.List - skipInsert SkipInsertFunction -} - -type preparedItemsEntry struct { - itemID string - boostOfID string - accountID string - boostOfAccountID string - prepared Preparable -} - -func (p *preparedItems) insertPrepared(ctx context.Context, newEntry *preparedItemsEntry) error { - if p.data == nil { - p.data = &list.List{} - } - - // if we have no entries yet, this is both the newest and oldest entry, so just put it in the front - if p.data.Len() == 0 { - p.data.PushFront(newEntry) - return nil - } - - var insertMark *list.Element - var position int - // We need to iterate through the index to make sure we put this entry in the appropriate place according to when it was created. - // We also need to make sure we're not inserting a duplicate entry -- this can happen sometimes and it's not nice UX (*shudder*). - for e := p.data.Front(); e != nil; e = e.Next() { - position++ - - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return errors.New("insertPrepared: could not parse e as a preparedItemsEntry") - } - - skip, err := p.skipInsert(ctx, newEntry.itemID, newEntry.accountID, newEntry.boostOfID, newEntry.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position) - if err != nil { - return err - } - if skip { - return nil - } - - // if the entry to index is newer than e, insert it before e in the list - if insertMark == nil { - if newEntry.itemID > entry.itemID { - insertMark = e - } - } - - // make sure we don't insert a duplicate - if entry.itemID == newEntry.itemID { - return nil - } - } - - if insertMark != nil { - p.data.InsertBefore(newEntry, insertMark) - return nil - } - - // if we reach this point it's the oldest entry we've seen so put it at the back - p.data.PushBack(newEntry) - return nil -} diff --git a/internal/timeline/prune.go b/internal/timeline/prune.go index 8360032b7..a3a5bf9cb 100644 --- a/internal/timeline/prune.go +++ b/internal/timeline/prune.go @@ -21,47 +21,63 @@ import ( "container/list" ) -const ( - defaultDesiredIndexedItemsLength = 400 - defaultDesiredPreparedItemsLength = 50 -) - func (t *timeline) Prune(desiredPreparedItemsLength int, desiredIndexedItemsLength int) int { t.Lock() defer t.Unlock() - pruneList := func(pruneTo int, listToPrune *list.List) int { - if listToPrune == nil { - // no need to prune - return 0 - } - - unprunedLength := listToPrune.Len() - if unprunedLength <= pruneTo { - // no need to prune - return 0 - } - - // work from the back + assemble a slice of entries that we will prune - amountStillToPrune := unprunedLength - pruneTo - itemsToPrune := make([]*list.Element, 0, amountStillToPrune) - for e := listToPrune.Back(); amountStillToPrune > 0; e = e.Prev() { - itemsToPrune = append(itemsToPrune, e) - amountStillToPrune-- - } - - // remove the entries we found - var totalPruned int - for _, e := range itemsToPrune { - listToPrune.Remove(e) - totalPruned++ - } - - return totalPruned + l := t.items.data + if l == nil { + // Nothing to prune. + return 0 } - prunedPrepared := pruneList(desiredPreparedItemsLength, t.preparedItems.data) - prunedIndexed := pruneList(desiredIndexedItemsLength, t.indexedItems.data) + var ( + position int + totalPruned int + toRemove *[]*list.Element + ) - return prunedPrepared + prunedIndexed + // Only initialize toRemove if we know we're + // going to need it, otherwise skiperino. + if toRemoveLen := t.items.data.Len() - desiredIndexedItemsLength; toRemoveLen > 0 { + toRemove = func() *[]*list.Element { tr := make([]*list.Element, 0, toRemoveLen); return &tr }() + } + + // Work from the front of the list until we get + // to the point where we need to start pruning. + for e := l.Front(); e != nil; e = e.Next() { + position++ + + if position <= desiredPreparedItemsLength { + // We're still within our allotted + // prepped length, nothing to do yet. + continue + } + + // We need to *at least* unprepare this entry. + // If we're beyond our indexed length already, + // we can just remove the item completely. + if position > desiredIndexedItemsLength { + *toRemove = append(*toRemove, e) + totalPruned++ + continue + } + + entry := e.Value.(*indexedItemsEntry) //nolint:forcetypeassert + if entry.prepared == nil { + // It's already unprepared (mood). + continue + } + + entry.prepared = nil // <- eat this up please garbage collector nom nom nom + totalPruned++ + } + + if toRemove != nil { + for _, e := range *toRemove { + l.Remove(e) + } + } + + return totalPruned } diff --git a/internal/timeline/prune_test.go b/internal/timeline/prune_test.go index 7daf88b83..89164563b 100644 --- a/internal/timeline/prune_test.go +++ b/internal/timeline/prune_test.go @@ -52,7 +52,7 @@ func (suite *PruneTestSuite) SetupTest() { testrig.StandardDBSetup(suite.db, nil) // let's take local_account_1 as the timeline owner - tl, err := timeline.NewTimeline( + tl := timeline.NewTimeline( context.Background(), suite.testAccounts["local_account_1"].ID, processing.StatusGrabFunction(suite.db), @@ -60,9 +60,6 @@ func (suite *PruneTestSuite) SetupTest() { processing.StatusPrepareFunction(suite.db, suite.tc), processing.StatusSkipInsertFunction(), ) - if err != nil { - suite.FailNow(err.Error()) - } // put the status IDs in a determinate order since we can't trust a map to keep its order statuses := []*gtsmodel.Status{} @@ -90,20 +87,30 @@ func (suite *PruneTestSuite) TearDownTest() { func (suite *PruneTestSuite) TestPrune() { // prune down to 5 prepared + 5 indexed - suite.Equal(24, suite.timeline.Prune(5, 5)) - suite.Equal(5, suite.timeline.ItemIndexLength(context.Background())) + suite.Equal(12, suite.timeline.Prune(5, 5)) + suite.Equal(5, suite.timeline.Len()) +} + +func (suite *PruneTestSuite) TestPruneTwice() { + // prune down to 5 prepared + 10 indexed + suite.Equal(12, suite.timeline.Prune(5, 10)) + suite.Equal(10, suite.timeline.Len()) + + // Prune same again, nothing should be pruned this time. + suite.Zero(suite.timeline.Prune(5, 10)) + suite.Equal(10, suite.timeline.Len()) } func (suite *PruneTestSuite) TestPruneTo0() { // prune down to 0 prepared + 0 indexed - suite.Equal(34, suite.timeline.Prune(0, 0)) - suite.Equal(0, suite.timeline.ItemIndexLength(context.Background())) + suite.Equal(17, suite.timeline.Prune(0, 0)) + suite.Equal(0, suite.timeline.Len()) } func (suite *PruneTestSuite) TestPruneToInfinityAndBeyond() { // prune to 99999, this should result in no entries being pruned suite.Equal(0, suite.timeline.Prune(99999, 99999)) - suite.Equal(17, suite.timeline.ItemIndexLength(context.Background())) + suite.Equal(17, suite.timeline.Len()) } func TestPruneTestSuite(t *testing.T) { diff --git a/internal/timeline/remove.go b/internal/timeline/remove.go index 62d55a9dc..e76913a2f 100644 --- a/internal/timeline/remove.go +++ b/internal/timeline/remove.go @@ -20,7 +20,6 @@ package timeline import ( "container/list" "context" - "errors" "codeberg.org/gruf/go-kv" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -35,52 +34,35 @@ func (t *timeline) Remove(ctx context.Context, statusID string) (int, error) { t.Lock() defer t.Unlock() - var removed int - // remove entr(ies) from the post index - removeIndexes := []*list.Element{} - if t.indexedItems != nil && t.indexedItems.data != nil { - for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*indexedItemsEntry) - if !ok { - return removed, errors.New("Remove: could not parse e as a postIndexEntry") - } - if entry.itemID == statusID { - l.Debug("found status in postIndex") - removeIndexes = append(removeIndexes, e) - } + if t.items == nil || t.items.data == nil { + // Nothing to do. + return 0, nil + } + + var toRemove []*list.Element + for e := t.items.data.Front(); e != nil; e = e.Next() { + entry := e.Value.(*indexedItemsEntry) // nolint:forcetypeassert + + if entry.itemID != statusID { + // Not relevant. + continue } - } - for _, e := range removeIndexes { - t.indexedItems.data.Remove(e) - removed++ + + l.Debug("removing item") + toRemove = append(toRemove, e) } - // remove entr(ies) from prepared posts - removePrepared := []*list.Element{} - if t.preparedItems != nil && t.preparedItems.data != nil { - for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return removed, errors.New("Remove: could not parse e as a preparedPostsEntry") - } - if entry.itemID == statusID { - l.Debug("found status in preparedPosts") - removePrepared = append(removePrepared, e) - } - } - } - for _, e := range removePrepared { - t.preparedItems.data.Remove(e) - removed++ + for _, e := range toRemove { + t.items.data.Remove(e) } - l.Debugf("removed %d entries", removed) - return removed, nil + return len(toRemove), nil } -func (t *timeline) RemoveAllBy(ctx context.Context, accountID string) (int, error) { - l := log.WithContext(ctx). +func (t *timeline) RemoveAllByOrBoosting(ctx context.Context, accountID string) (int, error) { + l := log. + WithContext(ctx). WithFields(kv.Fields{ {"accountTimeline", t.accountID}, {"accountID", accountID}, @@ -88,46 +70,28 @@ func (t *timeline) RemoveAllBy(ctx context.Context, accountID string) (int, erro t.Lock() defer t.Unlock() - var removed int - // remove entr(ies) from the post index - removeIndexes := []*list.Element{} - if t.indexedItems != nil && t.indexedItems.data != nil { - for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*indexedItemsEntry) - if !ok { - return removed, errors.New("Remove: could not parse e as a postIndexEntry") - } - if entry.accountID == accountID || entry.boostOfAccountID == accountID { - l.Debug("found status in postIndex") - removeIndexes = append(removeIndexes, e) - } + if t.items == nil || t.items.data == nil { + // Nothing to do. + return 0, nil + } + + var toRemove []*list.Element + for e := t.items.data.Front(); e != nil; e = e.Next() { + entry := e.Value.(*indexedItemsEntry) // nolint:forcetypeassert + + if entry.accountID != accountID && entry.boostOfAccountID != accountID { + // Not relevant. + continue } - } - for _, e := range removeIndexes { - t.indexedItems.data.Remove(e) - removed++ + + l.Debug("removing item") + toRemove = append(toRemove, e) } - // remove entr(ies) from prepared posts - removePrepared := []*list.Element{} - if t.preparedItems != nil && t.preparedItems.data != nil { - for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*preparedItemsEntry) - if !ok { - return removed, errors.New("Remove: could not parse e as a preparedPostsEntry") - } - if entry.accountID == accountID || entry.boostOfAccountID == accountID { - l.Debug("found status in preparedPosts") - removePrepared = append(removePrepared, e) - } - } - } - for _, e := range removePrepared { - t.preparedItems.data.Remove(e) - removed++ + for _, e := range toRemove { + t.items.data.Remove(e) } - l.Debugf("removed %d entries", removed) - return removed, nil + return len(toRemove), nil } diff --git a/internal/timeline/timeline.go b/internal/timeline/timeline.go index ab1c0fdb2..d3550e612 100644 --- a/internal/timeline/timeline.go +++ b/internal/timeline/timeline.go @@ -78,32 +78,25 @@ type Timeline interface { INDEXING + PREPARATION FUNCTIONS */ - // IndexOne puts a item into the timeline at the appropriate place according to its 'createdAt' property. - // - // The returned bool indicates whether or not the item was actually inserted into the timeline. This will be false - // if the item is a boost and the original item or another boost of it already exists < boostReinsertionDepth back in the timeline. - IndexOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) - // IndexAndPrepareOne puts a item into the timeline at the appropriate place according to its 'createdAt' property, - // and then immediately prepares it. + // IndexAndPrepareOne puts a item into the timeline at the appropriate place according to its id, and then immediately prepares it. // // The returned bool indicates whether or not the item was actually inserted into the timeline. This will be false // if the item is a boost and the original item or another boost of it already exists < boostReinsertionDepth back in the timeline. IndexAndPrepareOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) - // PrepareXFromTop instructs the timeline to prepare x amount of items from the top of the timeline, useful during init. - PrepareFromTop(ctx context.Context, amount int) error /* INFO FUNCTIONS */ - // ActualPostIndexLength returns the actual length of the item index at this point in time. - ItemIndexLength(ctx context.Context) int - // OldestIndexedItemID returns the id of the rearmost (ie., the oldest) indexed item, or an error if something goes wrong. - // If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this. - OldestIndexedItemID(ctx context.Context) (string, error) - // NewestIndexedItemID returns the id of the frontmost (ie., the newest) indexed item, or an error if something goes wrong. - // If nothing goes wrong but there's no newest item, an empty string will be returned so make sure to check for this. - NewestIndexedItemID(ctx context.Context) (string, error) + // AccountID returns the id of the account this timeline belongs to. + AccountID() string + + // Len returns the length of the item index at this point in time. + Len() int + + // OldestIndexedItemID returns the id of the rearmost (ie., the oldest) indexed item. + // If there's no oldest item, an empty string will be returned so make sure to check for this. + OldestIndexedItemID() string /* UTILITY FUNCTIONS @@ -111,27 +104,29 @@ type Timeline interface { // LastGot returns the time that Get was last called. LastGot() time.Time - // Prune prunes preparedItems and indexedItems in this timeline to the desired lengths. + + // Prune prunes prepared and indexed items in this timeline to the desired lengths. // This will be a no-op if the lengths are already < the desired values. - // Prune acquires a lock on the timeline before pruning. - // The return value is the combined total of items pruned from preparedItems and indexedItems. + // + // The returned int indicates the amount of entries that were removed or unprepared. Prune(desiredPreparedItemsLength int, desiredIndexedItemsLength int) int - // Remove removes a item from both the index and prepared items. + + // Remove removes an item with the given ID. // // If a item has multiple entries in a timeline, they will all be removed. // // The returned int indicates the amount of entries that were removed. Remove(ctx context.Context, itemID string) (int, error) - // RemoveAllBy removes all items by the given accountID, from both the index and prepared items. + + // RemoveAllByOrBoosting removes all items created by or boosting the given accountID. // // The returned int indicates the amount of entries that were removed. - RemoveAllBy(ctx context.Context, accountID string) (int, error) + RemoveAllByOrBoosting(ctx context.Context, accountID string) (int, error) } // timeline fulfils the Timeline interface type timeline struct { - indexedItems *indexedItems - preparedItems *preparedItems + items *indexedItems grabFunction GrabFunction filterFunction FilterFunction prepareFunction PrepareFunction @@ -140,6 +135,10 @@ type timeline struct { sync.Mutex } +func (t *timeline) AccountID() string { + return t.accountID +} + // NewTimeline returns a new Timeline for the given account ID func NewTimeline( ctx context.Context, @@ -148,12 +147,9 @@ func NewTimeline( filterFunction FilterFunction, prepareFunction PrepareFunction, skipInsertFunction SkipInsertFunction, -) (Timeline, error) { +) Timeline { return &timeline{ - indexedItems: &indexedItems{ - skipInsert: skipInsertFunction, - }, - preparedItems: &preparedItems{ + items: &indexedItems{ skipInsert: skipInsertFunction, }, grabFunction: grabFunction, @@ -161,5 +157,5 @@ func NewTimeline( prepareFunction: prepareFunction, accountID: timelineAccountID, lastGot: time.Time{}, - }, nil + } } diff --git a/internal/timeline/timeline_test.go b/internal/timeline/timeline_test.go index 2207a3418..fb1859fc4 100644 --- a/internal/timeline/timeline_test.go +++ b/internal/timeline/timeline_test.go @@ -34,8 +34,10 @@ type TimelineStandardTestSuite struct { tc typeutils.TypeConverter filter *visibility.Filter - testAccounts map[string]*gtsmodel.Account - testStatuses map[string]*gtsmodel.Status + testAccounts map[string]*gtsmodel.Account + testStatuses map[string]*gtsmodel.Status + highestStatusID string + lowestStatusID string timeline timeline.Timeline manager timeline.Manager diff --git a/internal/timeline/timelineable.go b/internal/timeline/types.go similarity index 77% rename from internal/timeline/timelineable.go rename to internal/timeline/types.go index 781550ef8..6243799f5 100644 --- a/internal/timeline/timelineable.go +++ b/internal/timeline/types.go @@ -17,10 +17,18 @@ package timeline -// Timelineable represents any item that can be put in a timeline. +// Timelineable represents any item that can be indexed in a timeline. type Timelineable interface { GetID() string GetAccountID() string GetBoostOfID() string GetBoostOfAccountID() string } + +// Preparable represents any item that can be prepared in a timeline. +type Preparable interface { + GetID() string + GetAccountID() string + GetBoostOfID() string + GetBoostOfAccountID() string +}