diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go index ad677b228..d35627ff1 100644 --- a/internal/federation/dereferencing/thread.go +++ b/internal/federation/dereferencing/thread.go @@ -56,9 +56,20 @@ func (d *deref) DereferenceStatusAncestors( username string, status *gtsmodel.Status, ) error { + // Start log entry with fields + l := log.WithContext(ctx). + WithFields(kv.Fields{ + {"username", username}, + {"original", status.URI}, + }...) + + // Keep track of already dereferenced statuses + // for this ancestor thread to prevent recursion. + derefdStatuses := make(map[string]struct{}, 10) + // Mark given status as the one // we're currently working on. - var current = status + current := status for i := 0; i < maxIter; i++ { if current.InReplyToURI == "" { @@ -67,14 +78,21 @@ func (d *deref) DereferenceStatusAncestors( return nil } - l := log. - WithContext(ctx). - WithFields(kv.Fields{ - {"username", username}, - {"originalStatusIRI", status.URI}, - {"currentStatusURI", current.URI}, - {"currentInReplyToURI", current.InReplyToURI}, - }...) + // Add new log fields for this iteration. + l = l.WithFields(kv.Fields{ + {"current", current.URI}, + {"parent", current.InReplyToURI}, + }...) + l.Trace("following status ancestors") + + // Check whether this parent has already been deref'd. + if _, ok := derefdStatuses[current.InReplyToURI]; ok { + l.Warn("self referencing status ancestors") + return nil + } + + // Add this status URI to map of deref'd. + derefdStatuses[current.URI] = struct{}{} if current.InReplyToID != "" { // We already have an InReplyToID set. This means @@ -123,7 +141,7 @@ func (d *deref) DereferenceStatusAncestors( // by another action. // // TODO: clean this up in a nightly task. - l.Warnf("current status has been orphaned (parent %s no longer exists in database)", current.InReplyToID) + l.Warn("orphaned status (parent no longer exists)") return nil // Cannot iterate further. } @@ -134,16 +152,15 @@ func (d *deref) DereferenceStatusAncestors( inReplyToURI, err := url.Parse(current.InReplyToURI) if err != nil || inReplyToURI == nil { // Parent URI is not something we can handle. - l.Debug("current status has been orphaned (invalid InReplyToURI)") + l.Warn("orphaned status (invalid InReplyToURI)") return nil //nolint:nilerr } // Parent URI is valid, try to get it. // getStatusByURI guards against the following conditions: - // + // - refetching recently fetched statuses (recursion!) // - remote domain is blocked (will return unretrievable) - // - domain is local (will try to return something, or - // return unretrievable). + // - any http type error for a new status returns unretrievable parent, _, err := d.getStatusByURI(ctx, username, inReplyToURI) if err == nil { // We successfully fetched the parent. @@ -171,7 +188,7 @@ func (d *deref) DereferenceStatusAncestors( case code == http.StatusGone: // 410 means the status has definitely been deleted. // Update this status to reflect that, then bail. - l.Debug("current status has been orphaned (call to parent returned code 410 Gone)") + l.Debug("orphaned status: parent returned 410 Gone") current.InReplyToURI = "" if err := d.state.DB.UpdateStatus( @@ -180,19 +197,19 @@ func (d *deref) DereferenceStatusAncestors( ); err != nil { return gtserror.Newf("db error updating status %s: %w", current.ID, err) } + return nil case code != 0: - // We had a code, but not one indicating deletion, - // log the code but don't return error or update the - // status; we can try again later. - l.Warnf("cannot dereference parent (%q)", err) + // We had a code, but not one indicating deletion, log the code + // but don't return error or update the status; we can try again later. + l.Warnf("orphaned status: http error dereferencing parent: %v)", err) return nil case gtserror.Unretrievable(err): // Not retrievable for some other reason, so just - // bail; we can try again later if necessary. - l.Debugf("parent unretrievable (%q)", err) + // bail for now; we can try again later if necessary. + l.Warnf("orphaned status: parent unretrievable: %v)", err) return nil default: @@ -205,35 +222,44 @@ func (d *deref) DereferenceStatusAncestors( } func (d *deref) DereferenceStatusDescendants(ctx context.Context, username string, statusIRI *url.URL, parent ap.Statusable) error { - // Take ref to original - ogIRI := statusIRI + statusIRIStr := statusIRI.String() // Start log entry with fields l := log.WithContext(ctx). WithFields(kv.Fields{ {"username", username}, - {"statusIRI", ogIRI}, + {"status", statusIRIStr}, }...) // Log function start l.Trace("beginning") - // frame represents a single stack frame when iteratively - // dereferencing status descendants. where statusIRI and - // statusable are of the status whose children we are to - // descend, page is the current activity streams collection - // page of entities we are on (as we often push a frame to - // stack mid-paging), and item___ are entity iterators for - // this activity streams collection page. + // OUR instance hostname. + localhost := config.GetHost() + + // Keep track of already dereferenced collection + // pages for this thread to prevent recursion. + derefdPages := make(map[string]struct{}, 10) + + // frame represents a single stack frame when + // iteratively derefencing status descendants. type frame struct { - statusIRI *url.URL - statusable ap.Statusable - page ap.CollectionPageable - itemIter vocab.ActivityStreamsItemsPropertyIterator + // page is the current activity streams + // collection page we are on (as we often + // push a frame to stack mid-paging). + page ap.CollectionPageable + + // pageURI is the URI string of + // the frame's collection page + // (is useful for logging). + pageURI string + + // items is the entity iterator for frame's page. + items vocab.ActivityStreamsItemsPropertyIterator } var ( - // current is the current stack frame + // current stack frame current *frame // stack is a list of "shelved" descendand iterator @@ -242,11 +268,14 @@ func (d *deref) DereferenceStatusDescendants(ctx context.Context, username strin // popped from into 'current' when that child's tree // of further descendants is exhausted. stack = []*frame{ - { - // Starting input is first frame - statusIRI: statusIRI, - statusable: parent, - }, + func() *frame { + // Start input frame is built from the first input. + page, pageURI := getAttachedStatusCollection(parent) + if page == nil { + return nil + } + return &frame{page: page, pageURI: pageURI} + }(), } // popStack will remove and return the top frame @@ -274,42 +303,9 @@ stackLoop: return nil } - if current.page == nil { - if current.statusIRI.Host == config.GetHost() { - // This is a local status, no looping to do - continue stackLoop - } - - l.Tracef("following remote status descendants: %s", current.statusIRI) - - // Look for an attached status replies (as collection) - replies := current.statusable.GetActivityStreamsReplies() - if replies == nil { - continue stackLoop - } - - // Get the status replies collection - collection := replies.GetActivityStreamsCollection() - if collection == nil { - continue stackLoop - } - - // Get the "first" property of the replies collection - first := collection.GetActivityStreamsFirst() - if first == nil { - continue stackLoop - } - - // Set the first activity stream collection page - current.page = first.GetActivityStreamsCollectionPage() - if current.page == nil { - continue stackLoop - } - } - pageLoop: for { - if current.itemIter == nil { + if current.items == nil { // Get the items associated with this page items := current.page.GetActivityStreamsItems() if items == nil { @@ -317,21 +313,23 @@ stackLoop: } // Start off the item iterator - current.itemIter = items.Begin() + current.items = items.Begin() } + l.Tracef("following collection page: %s", current.pageURI) + itemLoop: for { // Check for remaining iter - if current.itemIter == nil { + if current.items == nil { break itemLoop } // Get current item iterator - itemIter := current.itemIter + itemIter := current.items // Set the next available iterator - current.itemIter = itemIter.Next() + current.items = itemIter.Next() // Check for available IRI on item itemIRI, _ := pub.ToId(itemIter) @@ -339,76 +337,123 @@ stackLoop: continue itemLoop } - if itemIRI.Host == config.GetHost() { + if itemIRI.Host == localhost { // This child is one of ours, continue itemLoop } // Dereference the remote status and store in the database. // getStatusByURI guards against the following conditions: - // + // - refetching recently fetched statuses (recursion!) // - remote domain is blocked (will return unretrievable) - // - domain is local (will try to return something, or - // return unretrievable). + // - any http type error for a new status returns unretrievable _, statusable, err := d.getStatusByURI(ctx, username, itemIRI) if err != nil { if !gtserror.Unretrievable(err) { l.Errorf("error dereferencing remote status %s: %v", itemIRI, err) } - continue itemLoop } if statusable == nil { - // Already up-to-date. + // A nil statusable return from + // getStatusByURI() indicates a + // remote status that was already + // dereferenced recently (so no + // need to go through descendents). + continue itemLoop + } + + // Extract any attached collection + URI from status. + page, pageURI := getAttachedStatusCollection(statusable) + if page == nil { continue itemLoop } // Put current and next frame at top of stack stack = append(stack, current, &frame{ - statusIRI: itemIRI, - statusable: statusable, + pageURI: pageURI, + page: page, }) // Now start at top of loop continue stackLoop } - // Get the current page's "next" property + // Get the current page's "next" property. pageNext := current.page.GetActivityStreamsNext() if pageNext == nil || !pageNext.IsIRI() { continue stackLoop } // Get the IRI of the "next" property. - pageNextIRI := pageNext.GetIRI() + pageNextURI := pageNext.GetIRI() + pageNextURIStr := pageNextURI.String() - // Ensure this isn't a self-referencing page... - // We don't need to store / check against a map of IRIs - // as our getStatusByIRI() function above prevents iter'ing - // over statuses that have been dereferenced recently, due to - // the `fetched_at` field preventing frequent refetches. - if id := current.page.GetJSONLDId(); id != nil && - pageNextIRI.String() == id.Get().String() { - log.Warnf(ctx, "self referencing collection page: %s", pageNextIRI) + // Check whether this page has already been deref'd. + if _, ok := derefdPages[pageNextURIStr]; ok { + l.Warnf("self referencing collection page(s): %s", pageNextURIStr) continue stackLoop } - // Dereference this next collection page by its IRI + // Mark this collection page as deref'd. + derefdPages[pageNextURIStr] = struct{}{} + + // Dereference this next collection page by its IRI. collectionPage, err := d.dereferenceCollectionPage(ctx, username, - pageNextIRI, + pageNextURI, ) if err != nil { - l.Errorf("error dereferencing remote collection page %q: %s", pageNextIRI.String(), err) + l.Errorf("error dereferencing collection page %q: %s", pageNextURIStr, err) continue stackLoop } - // Set the updated collection page + // Set the next collection page. current.page = collectionPage + current.pageURI = pageNextURIStr continue pageLoop } } - return gtserror.Newf("reached %d descendant iterations for %q", maxIter, ogIRI.String()) + return gtserror.Newf("reached %d descendant iterations for %q", maxIter, statusIRIStr) +} + +// getAttachedStatusCollection is a small utility function to fetch the first page +// of an attached activity streams collection from a provided statusable object . +func getAttachedStatusCollection(status ap.Statusable) (page ap.CollectionPageable, uri string) { //nolint:gocritic + // Look for an attached status replies (as collection) + replies := status.GetActivityStreamsReplies() + if replies == nil { + return nil, "" + } + + // Get the status replies collection + collection := replies.GetActivityStreamsCollection() + if collection == nil { + return nil, "" + } + + // Get the "first" property of the replies collection + first := collection.GetActivityStreamsFirst() + if first == nil { + return nil, "" + } + + // Return the first activity stream collection page + page = first.GetActivityStreamsCollectionPage() + if page == nil { + return nil, "" + } + + if pageID := page.GetJSONLDId(); pageID != nil { + // By default use collection JSONLD ID + return page, pageID.Get().String() + } else if statusID := status.GetJSONLDId(); statusID != nil { + // Else, if possible use status JSONLD ID + return page, statusID.Get().String() + } else { + // MUST have some kind of ID + return nil, "" + } }