mirror of
1
Fork 0

[feature] Implement `deliveryRecipientPreSort` to prioritize delivery to mentioned accounts (#3668)

* weeeeenus

* update to latest activity

* update to use latest release tag of superseriousbusiness/activity

---------

Co-authored-by: kim <grufwub@gmail.com>
This commit is contained in:
tobi 2025-01-27 19:22:15 +01:00 committed by GitHub
parent 9048290948
commit 65fb8abd42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 386 additions and 165 deletions

2
go.mod
View File

@ -70,7 +70,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.10.0
github.com/superseriousbusiness/activity v1.9.0-gts
github.com/superseriousbusiness/activity v1.10.0-gts
github.com/superseriousbusiness/httpsig v1.2.0-SSB
github.com/superseriousbusiness/oauth2/v4 v4.3.2-SSB.0.20230227143000-f4900831d6c8
github.com/tdewolff/minify/v2 v2.21.2

4
go.sum generated
View File

@ -533,8 +533,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/superseriousbusiness/activity v1.9.0-gts h1:qWMDeiGdnVi+XG7CfuM7ET87qe9adousU6utWItBX/o=
github.com/superseriousbusiness/activity v1.9.0-gts/go.mod h1:9l74ZCv8zw07vipNMzahq8oQZt2xPaJZ+L+gLicQntQ=
github.com/superseriousbusiness/activity v1.10.0-gts h1:uYIHU0/jDpLxj0lA3Jg24lM8p3X/Vb3J7hn3yQJR+C8=
github.com/superseriousbusiness/activity v1.10.0-gts/go.mod h1:9l74ZCv8zw07vipNMzahq8oQZt2xPaJZ+L+gLicQntQ=
github.com/superseriousbusiness/go-jpeg-image-structure/v2 v2.0.0-20220321154430-d89a106fdabe h1:ksl2oCx/Qo8sNDc3Grb8WGKBM9nkvhCm25uvlT86azE=
github.com/superseriousbusiness/go-jpeg-image-structure/v2 v2.0.0-20220321154430-d89a106fdabe/go.mod h1:gH4P6gN1V+wmIw5o97KGaa1RgXB/tVpC2UNzijhg3E4=
github.com/superseriousbusiness/go-png-image-structure/v2 v2.0.1-SSB h1:8psprYSK1KdOSH7yQ4PbJq0YYaGQY+gzdW/B0ExDb/8=

View File

@ -23,6 +23,7 @@ import (
"fmt"
"net/http"
"net/url"
"slices"
errorsv2 "codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-kv"
@ -30,9 +31,11 @@ import (
"github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/ap"
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/uris"
)
// federatingActor wraps the pub.FederatingActor
@ -42,10 +45,63 @@ type federatingActor struct {
wrapped pub.FederatingActor
}
func deliveryRecipientPreSort(actorAndCollectionIRIs []*url.URL) []*url.URL {
var (
thisHost = config.GetHost()
thisAcctDomain = config.GetAccountDomain()
)
slices.SortFunc(
actorAndCollectionIRIs,
func(a *url.URL, b *url.URL) int {
// We want to sort by putting more specific actor URIs *before* collection URIs.
// Since the only collection URIs we ever address are our own followers URIs, we
// can just use host and regexes to identify these collections, and shove them
// to the back of the slice. This ensures that directly addressed (ie., mentioned)
// accounts get delivery-attempted *first*, and then delivery attempts move on to
// followers of the author. This should have the effect of making conversation
/// threads feel more snappy, as replies will be sent quicker to participants.
var (
aIsFollowers = (a.Host == thisHost || a.Host == thisAcctDomain) && uris.IsFollowersPath(a)
bIsFollowers = (b.Host == thisHost || b.Host == thisAcctDomain) && uris.IsFollowersPath(b)
)
switch {
case aIsFollowers == bIsFollowers:
// Both followers URIs or
// both not followers URIs,
// order doesn't matter.
return 0
case aIsFollowers:
// a is followers
// URI, b is not.
//
// Sort b before a.
return 1
default:
// b is followers
// URI, a is not.
//
// Sort a before b.
return -1
}
},
)
return actorAndCollectionIRIs
}
// newFederatingActor returns a federatingActor.
func newFederatingActor(c pub.CommonBehavior, s2s pub.FederatingProtocol, db pub.Database, clock pub.Clock) pub.FederatingActor {
sideEffectActor := pub.NewSideEffectActor(c, s2s, nil, db, clock)
sideEffectActor.Serialize = ap.Serialize // hook in our own custom Serialize function
// Hook in our own custom Serialize function.
sideEffectActor.Serialize = ap.Serialize
// Hook in our own custom recipient pre-sort function.
sideEffectActor.DeliveryRecipientPreSort = deliveryRecipientPreSort
return &federatingActor{
sideEffectActor: sideEffectActor,

View File

@ -20,7 +20,7 @@ var _ DelegateActor = &SideEffectActor{}
// Note that when using the SideEffectActor with an application that good-faith
// implements its required interfaces, the ActivityPub specification is
// guaranteed to be correctly followed.
//
type SideEffectActor struct {
// When doing deliveries to remote servers via the s2s protocol, the side effect
// actor will by default use the Serialize function from the streams package.
// However, this can be overridden after the side effect actor is intantiated,
@ -36,9 +36,42 @@ var _ DelegateActor = &SideEffectActor{}
// likely cause race conditions or other problems! In most cases, you will never
// need to change this; it's provided solely to allow easier customization by
// applications.
type SideEffectActor struct {
Serialize func(a vocab.Type) (m map[string]interface{}, e error)
// When doing deliveries to remote servers via the s2s protocol, it may be desirable
// for implementations to be able to pre-sort recipients so that higher-priority
// recipients are higher up in the delivery queue, and lower-priority recipients
// are further down. This can be achieved by setting the DeliveryRecipientPreSort
// function on the side effect actor after it's instantiated. For example:
//
// a := NewSideEffectActor(...)
// a.DeliveryRecipientPreSort = func(actorAndCollectionIRIs []*url.URL) []*url.URL {
// // Put your sorting logic here.
// }
//
// The actorAndCollectionIRIs parameter will be the initial list of IRIs derived by
// looking at the "to", "cc", "bto", "bcc", and "audience" properties of the activity
// being delivered, excluding the AP public IRI, and before dereferencing of inboxes.
// It may look something like this:
//
// [
// "https://example.org/users/someone/followers", // <-- collection IRI
// "https://another.example.org/users/someone_else", // <-- actor IRI
// "[...]" // <-- etc
// ]
//
// In this case, implementers may wish to sort the slice so that the directly-addressed
// actor "https://another.example.org/users/someone_else" occurs at an earlier index in
// the slice than the followers collection "https://example.org/users/someone/followers",
// so that "@someone_else" receives the delivery first.
//
// Note that you should only do this *immediately* after instantiating the side
// effect actor -- never while your application is already running, as this will
// likely cause race conditions or other problems! It's also completely fine to not
// set this function at all -- in this case, no pre-sorting of recipients will be
// performed, and delivery will occur in a non-determinate order.
DeliveryRecipientPreSort func(actorAndCollectionIRIs []*url.URL) []*url.URL
common CommonBehavior
s2s FederatingProtocol
c2s SocialProtocol
@ -652,195 +685,315 @@ func (a *SideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI *
return false, nil
}
// prepare takes a deliverableObject and returns a list of the proper recipient
// target URIs. Additionally, the deliverableObject will have any hidden
// hidden recipients ("bto" and "bcc") stripped from it.
// prepare takes a deliverableObject and returns a list of the final
// recipient inbox IRIs. Additionally, the deliverableObject will have
// any hidden hidden recipients ("bto" and "bcc") stripped from it.
//
// Only call if both the social and federated protocol are supported.
func (a *SideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activity Activity) (r []*url.URL, err error) {
// Get inboxes of recipients
func (a *SideEffectActor) prepare(
ctx context.Context,
outboxIRI *url.URL,
activity Activity,
) ([]*url.URL, error) {
// Iterate through to, bto, cc, bcc, and audience
// to extract a slice of addressee IRIs / IDs.
//
// The resulting slice might look something like:
//
// [
// "https://example.org/users/someone/followers", // <-- collection IRI
// "https://another.example.org/users/someone_else", // <-- actor IRI
// "[...]" // <-- etc
// ]
var actorsAndCollections []*url.URL
if to := activity.GetActivityStreamsTo(); to != nil {
for iter := to.Begin(); iter != to.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
var err error
actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
iter, actorsAndCollections,
)
if err != nil {
return
}
r = append(r, val)
return nil, err
}
}
}
if bto := activity.GetActivityStreamsBto(); bto != nil {
for iter := bto.Begin(); iter != bto.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
var err error
actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
iter, actorsAndCollections,
)
if err != nil {
return
}
r = append(r, val)
return nil, err
}
}
}
if cc := activity.GetActivityStreamsCc(); cc != nil {
for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
var err error
actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
iter, actorsAndCollections,
)
if err != nil {
return
}
r = append(r, val)
return nil, err
}
}
}
if bcc := activity.GetActivityStreamsBcc(); bcc != nil {
for iter := bcc.Begin(); iter != bcc.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
var err error
actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
iter, actorsAndCollections,
)
if err != nil {
return
}
r = append(r, val)
return nil, err
}
}
}
if audience := activity.GetActivityStreamsAudience(); audience != nil {
for iter := audience.Begin(); iter != audience.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
var err error
actorsAndCollections, err = appendToActorsAndCollectionsIRIs(
iter, actorsAndCollections,
)
if err != nil {
return
}
r = append(r, val)
return nil, err
}
}
}
// 1. When an object is being delivered to the originating actor's
// followers, a server MAY reduce the number of receiving actors
// delivered to by identifying all followers which share the same
// sharedInbox who would otherwise be individual recipients and
// instead deliver objects to said sharedInbox.
// 2. If an object is addressed to the Public special collection, a
// server MAY deliver that object to all known sharedInbox endpoints
// on the network.
r = filterURLs(r, IsPublic)
// first check if the implemented database logic can return any inboxes
// from our list of actor IRIs.
foundInboxesFromDB := []*url.URL{}
for _, actorIRI := range r {
// PRE-SORTING
// If the pre-delivery sort function is defined,
// call it now so that implementations can sort
// delivery order to their preferences.
if a.DeliveryRecipientPreSort != nil {
actorsAndCollections = a.DeliveryRecipientPreSort(actorsAndCollections)
}
// We now need to dereference the actor or collection
// IRIs to derive inboxes that we can POST requests to.
var (
inboxes = make([]*url.URL, 0, len(actorsAndCollections))
derefdEntries = make(map[string]struct{}, len(actorsAndCollections))
)
// First check if the implemented database logic
// can return any of these inboxes without having
// to make remote dereference calls (much cheaper).
for _, actorOrCollection := range actorsAndCollections {
actorOrCollectionStr := actorOrCollection.String()
if _, derefd := derefdEntries[actorOrCollectionStr]; derefd {
// Ignore potential duplicates
// we've already derefd to inbox(es).
continue
}
// BEGIN LOCK
var unlock func()
unlock, err = a.db.Lock(c, actorIRI)
unlock, err := a.db.Lock(ctx, actorOrCollection)
if err != nil {
return
}
inboxes, err := a.db.InboxesForIRI(c, actorIRI)
if err != nil {
// bail on error
unlock()
return nil, err
}
if len(inboxes) > 0 {
// we have a hit
foundInboxesFromDB = append(foundInboxesFromDB, inboxes...)
// if we found inboxes for this iri, we should remove it from
// the list of actors/iris we still need to dereference
r = removeOne(r, actorIRI)
}
// Try to get inbox(es) for this actor or collection.
gotInboxes, err := a.db.InboxesForIRI(ctx, actorOrCollection)
// END LOCK
unlock()
}
// look for any actors' inboxes that weren't already discovered above;
// find these by making dereference calls to remote instances
t, err := a.common.NewTransport(c, outboxIRI, goFedUserAgent())
if err != nil {
return nil, err
}
foundActorsFromRemote, err := a.resolveActors(c, t, r, 0, a.s2s.MaxDeliveryRecursionDepth(c))
if err != nil {
return nil, err
}
foundInboxesFromRemote, err := getInboxes(foundActorsFromRemote)
if err != nil {
return nil, err
}
// combine this list of dereferenced inbox IRIs with the inboxes we already
// found in the db, to make a complete list of target IRIs
targets := []*url.URL{}
targets = append(targets, foundInboxesFromDB...)
targets = append(targets, foundInboxesFromRemote...)
if len(gotInboxes) == 0 {
// No hit(s).
continue
}
// Get inboxes of sender.
var unlock func()
unlock, err = a.db.Lock(c, outboxIRI)
if err != nil {
return
// We have one or more hits.
inboxes = append(inboxes, gotInboxes...)
// Mark this actor or collection as deref'd.
derefdEntries[actorOrCollectionStr] = struct{}{}
}
// WARNING: No deferring the Unlock
actorIRI, err := a.db.ActorForOutbox(c, outboxIRI)
unlock() // unlock after regardless
if err != nil {
return
}
// Get the inbox on the sender.
unlock, err = a.db.Lock(c, actorIRI)
// Now look for any remaining actors/collections
// that weren't already dereferenced into inboxes
// with db calls; find these by making deref calls
// to remote instances.
//
// First get a transport to do the http calls.
t, err := a.common.NewTransport(ctx, outboxIRI, goFedUserAgent())
if err != nil {
return nil, err
}
// Make HTTP calls to unpack collection IRIs into
// Actor IRIs and then into Actor types, ignoring
// actors or collections we've already deref'd.
actorsFromRemote, err := a.resolveActors(
ctx,
t,
actorsAndCollections,
derefdEntries,
0, a.s2s.MaxDeliveryRecursionDepth(ctx),
)
if err != nil {
return nil, err
}
// Release no-longer-needed collections.
clear(derefdEntries)
clear(actorsAndCollections)
// Extract inbox IRI from each deref'd Actor (if any).
inboxesFromRemote, err := actorsToInboxIRIs(actorsFromRemote)
if err != nil {
return nil, err
}
// Combine db-discovered inboxes and remote-discovered
// inboxes into a final list of destination inboxes.
inboxes = append(inboxes, inboxesFromRemote...)
// POST FILTERING
// Do a final pass of the inboxes to:
//
// 1. Deduplicate entries.
// 2. Ensure that the list of inboxes doesn't
// contain the inbox of whoever the outbox
// belongs to, no point delivering to oneself.
//
// To do this we first need to get the
// inbox IRI of this outbox's Actor.
// BEGIN LOCK
thisActor, err := a.db.Get(c, actorIRI)
unlock, err := a.db.Lock(ctx, outboxIRI)
if err != nil {
return nil, err
}
// Get the IRI of the Actor who owns this outbox.
outboxActorIRI, err := a.db.ActorForOutbox(ctx, outboxIRI)
// END LOCK
unlock()
// END LOCK -- Still need to handle err
if err != nil {
return nil, err
}
// Post-processing
var ignore *url.URL
ignore, err = getInbox(thisActor)
// BEGIN LOCK
unlock, err = a.db.Lock(ctx, outboxActorIRI)
if err != nil {
return nil, err
}
r = dedupeIRIs(targets, []*url.URL{ignore})
// Now get the Actor who owns this outbox.
outboxActor, err := a.db.Get(ctx, outboxActorIRI)
// END LOCK
unlock()
if err != nil {
return nil, err
}
// Extract the inbox IRI for the outbox Actor.
inboxOfOutboxActor, err := getInbox(outboxActor)
if err != nil {
return nil, err
}
// Deduplicate the final inboxes slice, and filter
// out of the inbox of this outbox actor (if present).
inboxes = filterInboxIRIs(inboxes, inboxOfOutboxActor)
// Now that we've derived inboxes to deliver
// the activity to, strip off any bto or bcc
// recipients, as per the AP spec requirements.
stripHiddenRecipients(activity)
return r, nil
// All done!
return inboxes, nil
}
// resolveActors takes a list of Actor id URIs and returns them as concrete
// instances of actorObject. It attempts to apply recursively when it encounters
// a target that is a Collection or OrderedCollection.
//
// Any IRI strings in the ignores map will be skipped (use this when
// you've already dereferenced some of the actorAndCollectionIRIs).
//
// If maxDepth is zero or negative, then recursion is infinitely applied.
//
// If a recipient is a Collection or OrderedCollection, then the server MUST
// dereference the collection, WITH the user's credentials.
//
// Note that this also applies to CollectionPage and OrderedCollectionPage.
func (a *SideEffectActor) resolveActors(c context.Context, t Transport, r []*url.URL, depth, maxDepth int) (actors []vocab.Type, err error) {
func (a *SideEffectActor) resolveActors(
ctx context.Context,
t Transport,
actorAndCollectionIRIs []*url.URL,
ignores map[string]struct{},
depth, maxDepth int,
) ([]vocab.Type, error) {
if maxDepth > 0 && depth >= maxDepth {
return
// Hit our max depth.
return nil, nil
}
for _, u := range r {
var act vocab.Type
var more []*url.URL
// TODO: Determine if more logic is needed here for inaccessible
// collections owned by peer servers.
act, more, err = a.dereferenceForResolvingInboxes(c, t, u)
if len(actorAndCollectionIRIs) == 0 {
// Nothing to do.
return nil, nil
}
// Optimistically assume 1:1 mapping of IRIs to actors.
actors := make([]vocab.Type, 0, len(actorAndCollectionIRIs))
// Deref each actorOrCollectionIRI if not ignored.
for _, actorOrCollectionIRI := range actorAndCollectionIRIs {
_, ignore := ignores[actorOrCollectionIRI.String()]
if ignore {
// Don't try to
// deref this one.
continue
}
// TODO: Determine if more logic is needed here for
// inaccessible collections owned by peer servers.
actor, more, err := a.dereferenceForResolvingInboxes(ctx, t, actorOrCollectionIRI)
if err != nil {
// Missing recipient -- skip.
continue
}
var recurActors []vocab.Type
recurActors, err = a.resolveActors(c, t, more, depth+1, maxDepth)
if actor != nil {
// Got a hit.
actors = append(actors, actor)
}
// If this was a collection, get more.
recurActors, err := a.resolveActors(
ctx,
t,
more,
ignores,
depth+1, maxDepth,
)
if err != nil {
return
}
if act != nil {
actors = append(actors, act)
return nil, err
}
actors = append(actors, recurActors...)
}
return
return actors, nil
}
// dereferenceForResolvingInboxes dereferences an IRI solely for finding an

View File

@ -385,19 +385,6 @@ func wrapInCreate(ctx context.Context, o vocab.Type, actor *url.URL) (c vocab.Ac
return
}
// filterURLs removes urls whose strings match the provided filter
func filterURLs(u []*url.URL, fn func(s string) bool) []*url.URL {
i := 0
for i < len(u) {
if fn(u[i].String()) {
u = append(u[:i], u[i+1:]...)
} else {
i++
}
}
return u
}
const (
// PublicActivityPubIRI is the IRI that indicates an Activity is meant
// to be visible for general public consumption.
@ -412,8 +399,28 @@ func IsPublic(s string) bool {
return s == PublicActivityPubIRI || s == publicJsonLD || s == publicJsonLDAS
}
// getInboxes extracts the 'inbox' IRIs from actor types.
func getInboxes(t []vocab.Type) (u []*url.URL, err error) {
// Derives an ID URI from the given IdProperty and, if it's not the
// magic AP Public IRI, appends it to the actorsAndCollections slice.
func appendToActorsAndCollectionsIRIs(
iter IdProperty,
actorsAndCollections []*url.URL,
) ([]*url.URL, error) {
id, err := ToId(iter)
if err != nil {
return nil, err
}
// Ignore Public IRI as we
// can't deliver to it directly.
if !IsPublic(id.String()) {
actorsAndCollections = append(actorsAndCollections, id)
}
return actorsAndCollections, nil
}
// actorsToInboxIRIs extracts the 'inbox' IRIs from actor types.
func actorsToInboxIRIs(t []vocab.Type) (u []*url.URL, err error) {
for _, elem := range t {
var iri *url.URL
iri, err = getInbox(elem)
@ -436,32 +443,37 @@ func getInbox(t vocab.Type) (u *url.URL, err error) {
return ToId(inbox)
}
// dedupeIRIs will deduplicate final inbox IRIs. The ignore list is applied to
// the final list.
func dedupeIRIs(recipients, ignored []*url.URL) (out []*url.URL) {
ignoredMap := make(map[string]bool, len(ignored))
for _, elem := range ignored {
ignoredMap[elem.String()] = true
}
outMap := make(map[string]bool, len(recipients))
for _, k := range recipients {
kStr := k.String()
if !ignoredMap[kStr] && !outMap[kStr] {
out = append(out, k)
outMap[kStr] = true
}
}
return
// filterInboxIRIs will deduplicate the given inboxes
// slice, while also leaving out any filtered IRIs.
func filterInboxIRIs(
inboxes []*url.URL,
filtered ...*url.URL,
) []*url.URL {
// Prepopulate the ignored map with each filtered IRI.
ignored := make(map[string]struct{}, len(filtered)+len(inboxes))
for _, filteredIRI := range filtered {
ignored[filteredIRI.String()] = struct{}{}
}
// removeOne removes any occurrences of entry from a slice of entries.
func removeOne(entries []*url.URL, entry *url.URL) (out []*url.URL) {
for _, e := range entries {
if e.String() != entry.String() {
out = append(out, e)
deduped := make([]*url.URL, 0, len(inboxes))
for _, inbox := range inboxes {
inboxStr := inbox.String()
_, ignore := ignored[inboxStr]
if ignore {
// We already included
// this URI in out, or
// we should ignore it.
continue
}
// Include this IRI in output, and
// add entry to the ignored map to
// ensure we don't include it again.
deduped = append(deduped, inbox)
ignored[inboxStr] = struct{}{}
}
return out
return deduped
}
// stripHiddenRecipients removes "bto" and "bcc" from the activity.

2
vendor/modules.txt vendored
View File

@ -660,7 +660,7 @@ github.com/stretchr/testify/suite
# github.com/subosito/gotenv v1.6.0
## explicit; go 1.18
github.com/subosito/gotenv
# github.com/superseriousbusiness/activity v1.9.0-gts
# github.com/superseriousbusiness/activity v1.10.0-gts
## explicit; go 1.21
github.com/superseriousbusiness/activity/pub
github.com/superseriousbusiness/activity/streams