mirror of
1
Fork 0

federating db updates (#118)

This commit is contained in:
Tobi Smethurst 2021-07-27 10:45:22 +02:00 committed by GitHub
parent a04888e9a6
commit 2c2dbe318e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 92 additions and 79 deletions

View File

@ -31,6 +31,19 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
}
l.Debugf("received ACCEPT asType %s", string(b))
targetAcctI := ctx.Value(util.APAccount)
if targetAcctI == nil {
// If the target account wasn't set on the context, that means this request didn't pass through the
// API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
}
targetAcct, ok := targetAcctI.(*gtsmodel.Account)
if !ok {
l.Error("ACCEPT: target account was set on context but couldn't be parsed")
return nil
}
fromFederatorChanI := ctx.Value(util.APFromFederatorChanKey)
if fromFederatorChanI == nil {
l.Error("ACCEPT: from federator channel wasn't set on context")
@ -42,17 +55,6 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return nil
}
inboxAcctI := ctx.Value(util.APAccount)
if inboxAcctI == nil {
l.Error("ACCEPT: inbox account wasn't set on context")
return nil
}
inboxAcct, ok := inboxAcctI.(*gtsmodel.Account)
if !ok {
l.Error("ACCEPT: inbox account was set on context but couldn't be parsed")
return nil
}
acceptObject := accept.GetActivityStreamsObject()
if acceptObject == nil {
return errors.New("ACCEPT: no object set on vocab.ActivityStreamsAccept")
@ -71,7 +73,7 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
}
// make sure the addressee of the original follow is the same as whatever inbox this landed in
if gtsFollowRequest.AccountID != inboxAcct.ID {
if gtsFollowRequest.AccountID != targetAcct.ID {
return errors.New("ACCEPT: follow object account and inbox account were not the same")
}
follow, err := f.db.AcceptFollowRequest(gtsFollowRequest.AccountID, gtsFollowRequest.TargetAccountID)
@ -83,7 +85,7 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
APObjectType: gtsmodel.ActivityStreamsFollow,
APActivityType: gtsmodel.ActivityStreamsAccept,
GTSModel: follow,
ReceivingAccount: inboxAcct,
ReceivingAccount: targetAcct,
}
return nil
@ -108,7 +110,7 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return fmt.Errorf("ACCEPT: error converting asfollow to gtsfollow: %s", err)
}
// make sure the addressee of the original follow is the same as whatever inbox this landed in
if gtsFollow.AccountID != inboxAcct.ID {
if gtsFollow.AccountID != targetAcct.ID {
return errors.New("ACCEPT: follow object account and inbox account were not the same")
}
follow, err := f.db.AcceptFollowRequest(gtsFollow.AccountID, gtsFollow.TargetAccountID)
@ -120,7 +122,7 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
APObjectType: gtsmodel.ActivityStreamsFollow,
APActivityType: gtsmodel.ActivityStreamsAccept,
GTSModel: follow,
ReceivingAccount: inboxAcct,
ReceivingAccount: targetAcct,
}
return nil

View File

@ -31,29 +31,31 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre
targetAcctI := ctx.Value(util.APAccount)
if targetAcctI == nil {
l.Error("target account wasn't set on context")
// If the target account wasn't set on the context, that means this request didn't pass through the
// API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
}
targetAcct, ok := targetAcctI.(*gtsmodel.Account)
if !ok {
l.Error("target account was set on context but couldn't be parsed")
l.Error("ANNOUNCE: target account was set on context but couldn't be parsed")
return nil
}
fromFederatorChanI := ctx.Value(util.APFromFederatorChanKey)
if fromFederatorChanI == nil {
l.Error("from federator channel wasn't set on context")
l.Error("ANNOUNCE: from federator channel wasn't set on context")
return nil
}
fromFederatorChan, ok := fromFederatorChanI.(chan gtsmodel.FromFederator)
if !ok {
l.Error("from federator channel was set on context but couldn't be parsed")
l.Error("ANNOUNCE: from federator channel was set on context but couldn't be parsed")
return nil
}
boost, isNew, err := f.typeConverter.ASAnnounceToStatus(announce)
if err != nil {
return fmt.Errorf("Announce: error converting announce to boost: %s", err)
return fmt.Errorf("ANNOUNCE: error converting announce to boost: %s", err)
}
if !isNew {

View File

@ -65,23 +65,25 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
targetAcctI := ctx.Value(util.APAccount)
if targetAcctI == nil {
l.Error("target account wasn't set on context")
// If the target account wasn't set on the context, that means this request didn't pass through the
// API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
}
targetAcct, ok := targetAcctI.(*gtsmodel.Account)
if !ok {
l.Error("target account was set on context but couldn't be parsed")
l.Error("CREATE: target account was set on context but couldn't be parsed")
return nil
}
fromFederatorChanI := ctx.Value(util.APFromFederatorChanKey)
if fromFederatorChanI == nil {
l.Error("from federator channel wasn't set on context")
l.Error("CREATE: from federator channel wasn't set on context")
return nil
}
fromFederatorChan, ok := fromFederatorChanI.(chan gtsmodel.FromFederator)
if !ok {
l.Error("from federator channel was set on context but couldn't be parsed")
l.Error("CREATE: from federator channel was set on context but couldn't be parsed")
return nil
}
@ -90,7 +92,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
// CREATE SOMETHING
create, ok := asType.(vocab.ActivityStreamsCreate)
if !ok {
return errors.New("could not convert type to create")
return errors.New("CREATE: could not convert type to create")
}
object := create.GetActivityStreamsObject()
for objectIter := object.Begin(); objectIter != object.End(); objectIter = objectIter.Next() {
@ -100,7 +102,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
note := objectIter.GetActivityStreamsNote()
status, err := f.typeConverter.ASStatusToStatus(note)
if err != nil {
return fmt.Errorf("error converting note to status: %s", err)
return fmt.Errorf("CREATE: error converting note to status: %s", err)
}
// id the status based on the time it was created
@ -117,7 +119,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
return nil
}
// an actual error has happened
return fmt.Errorf("database error inserting status: %s", err)
return fmt.Errorf("CREATE: database error inserting status: %s", err)
}
fromFederatorChan <- gtsmodel.FromFederator{
@ -132,12 +134,12 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
// FOLLOW SOMETHING
follow, ok := asType.(vocab.ActivityStreamsFollow)
if !ok {
return errors.New("could not convert type to follow")
return errors.New("CREATE: could not convert type to follow")
}
followRequest, err := f.typeConverter.ASFollowToFollowRequest(follow)
if err != nil {
return fmt.Errorf("could not convert Follow to follow request: %s", err)
return fmt.Errorf("CREATE: could not convert Follow to follow request: %s", err)
}
newID, err := id.NewULID()
@ -147,7 +149,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
followRequest.ID = newID
if err := f.db.Put(followRequest); err != nil {
return fmt.Errorf("database error inserting follow request: %s", err)
return fmt.Errorf("CREATE: database error inserting follow request: %s", err)
}
fromFederatorChan <- gtsmodel.FromFederator{
@ -160,12 +162,12 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
// LIKE SOMETHING
like, ok := asType.(vocab.ActivityStreamsLike)
if !ok {
return errors.New("could not convert type to like")
return errors.New("CREATE: could not convert type to like")
}
fave, err := f.typeConverter.ASLikeToFave(like)
if err != nil {
return fmt.Errorf("could not convert Like to fave: %s", err)
return fmt.Errorf("CREATE: could not convert Like to fave: %s", err)
}
newID, err := id.NewULID()
@ -175,7 +177,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
fave.ID = newID
if err := f.db.Put(fave); err != nil {
return fmt.Errorf("database error inserting fave: %s", err)
return fmt.Errorf("CREATE: database error inserting fave: %s", err)
}
fromFederatorChan <- gtsmodel.FromFederator{
@ -188,12 +190,12 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
// BLOCK SOMETHING
blockable, ok := asType.(vocab.ActivityStreamsBlock)
if !ok {
return errors.New("could not convert type to block")
return errors.New("CREATE: could not convert type to block")
}
block, err := f.typeConverter.ASBlockToBlock(blockable)
if err != nil {
return fmt.Errorf("could not convert Block to gts model block")
return fmt.Errorf("CREATE: could not convert Block to gts model block")
}
newID, err := id.NewULID()
@ -203,7 +205,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
block.ID = newID
if err := f.db.Put(block); err != nil {
return fmt.Errorf("database error inserting block: %s", err)
return fmt.Errorf("CREATE: database error inserting block: %s", err)
}
fromFederatorChan <- gtsmodel.FromFederator{

View File

@ -26,25 +26,27 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
)
l.Debugf("received DELETE id %s", id.String())
inboxAcctI := ctx.Value(util.APAccount)
if inboxAcctI == nil {
l.Error("inbox account wasn't set on context")
targetAcctI := ctx.Value(util.APAccount)
if targetAcctI == nil {
// If the target account wasn't set on the context, that means this request didn't pass through the
// API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
}
inboxAcct, ok := inboxAcctI.(*gtsmodel.Account)
targetAcct, ok := targetAcctI.(*gtsmodel.Account)
if !ok {
l.Error("inbox account was set on context but couldn't be parsed")
l.Error("DELETE: target account was set on context but couldn't be parsed")
return nil
}
fromFederatorChanI := ctx.Value(util.APFromFederatorChanKey)
if fromFederatorChanI == nil {
l.Error("from federator channel wasn't set on context")
l.Error("DELETE: from federator channel wasn't set on context")
return nil
}
fromFederatorChan, ok := fromFederatorChanI.(chan gtsmodel.FromFederator)
if !ok {
l.Error("from federator channel was set on context but couldn't be parsed")
l.Error("DELETE: from federator channel was set on context but couldn't be parsed")
return nil
}
@ -57,13 +59,13 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
// it's a status
l.Debugf("uri is for status with id: %s", s.ID)
if err := f.db.DeleteByID(s.ID, &gtsmodel.Status{}); err != nil {
return fmt.Errorf("Delete: err deleting status: %s", err)
return fmt.Errorf("DELETE: err deleting status: %s", err)
}
fromFederatorChan <- gtsmodel.FromFederator{
APObjectType: gtsmodel.ActivityStreamsNote,
APActivityType: gtsmodel.ActivityStreamsDelete,
GTSModel: s,
ReceivingAccount: inboxAcct,
ReceivingAccount: targetAcct,
}
}
@ -72,13 +74,13 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
// it's an account
l.Debugf("uri is for an account with id: %s", s.ID)
if err := f.db.DeleteByID(a.ID, &gtsmodel.Account{}); err != nil {
return fmt.Errorf("Delete: err deleting account: %s", err)
return fmt.Errorf("DELETE: err deleting account: %s", err)
}
fromFederatorChan <- gtsmodel.FromFederator{
APObjectType: gtsmodel.ActivityStreamsProfile,
APActivityType: gtsmodel.ActivityStreamsDelete,
GTSModel: a,
ReceivingAccount: inboxAcct,
ReceivingAccount: targetAcct,
}
}

View File

@ -32,19 +32,19 @@ func (f *federatingDB) Followers(c context.Context, actorIRI *url.URL) (follower
if util.IsUserPath(actorIRI) {
if err := f.db.GetWhere([]db.Where{{Key: "uri", Value: actorIRI.String()}}, acct); err != nil {
return nil, fmt.Errorf("db error getting account with uri %s: %s", actorIRI.String(), err)
return nil, fmt.Errorf("FOLLOWERS: db error getting account with uri %s: %s", actorIRI.String(), err)
}
} else if util.IsFollowersPath(actorIRI) {
if err := f.db.GetWhere([]db.Where{{Key: "followers_uri", Value: actorIRI.String()}}, acct); err != nil {
return nil, fmt.Errorf("db error getting account with followers uri %s: %s", actorIRI.String(), err)
return nil, fmt.Errorf("FOLLOWERS: db error getting account with followers uri %s: %s", actorIRI.String(), err)
}
} else {
return nil, fmt.Errorf("could not parse actor IRI %s as users or followers path", actorIRI.String())
return nil, fmt.Errorf("FOLLOWERS: could not parse actor IRI %s as users or followers path", actorIRI.String())
}
acctFollowers := []gtsmodel.Follow{}
if err := f.db.GetFollowersByAccountID(acct.ID, &acctFollowers, false); err != nil {
return nil, fmt.Errorf("db error getting followers for account id %s: %s", acct.ID, err)
return nil, fmt.Errorf("FOLLOWERS: db error getting followers for account id %s: %s", acct.ID, err)
}
followers = streams.NewActivityStreamsCollection()
@ -52,11 +52,11 @@ func (f *federatingDB) Followers(c context.Context, actorIRI *url.URL) (follower
for _, follow := range acctFollowers {
gtsFollower := &gtsmodel.Account{}
if err := f.db.GetByID(follow.AccountID, gtsFollower); err != nil {
return nil, fmt.Errorf("db error getting account id %s: %s", follow.AccountID, err)
return nil, fmt.Errorf("FOLLOWERS: db error getting account id %s: %s", follow.AccountID, err)
}
uri, err := url.Parse(gtsFollower.URI)
if err != nil {
return nil, fmt.Errorf("error parsing %s as url: %s", gtsFollower.URI, err)
return nil, fmt.Errorf("FOLLOWERS: error parsing %s as url: %s", gtsFollower.URI, err)
}
items.AppendIRI(uri)
}

View File

@ -31,19 +31,19 @@ func (f *federatingDB) Following(c context.Context, actorIRI *url.URL) (followin
acct := &gtsmodel.Account{}
if util.IsUserPath(actorIRI) {
if err := f.db.GetWhere([]db.Where{{Key: "uri", Value: actorIRI.String()}}, acct); err != nil {
return nil, fmt.Errorf("db error getting account with uri %s: %s", actorIRI.String(), err)
return nil, fmt.Errorf("FOLLOWING: db error getting account with uri %s: %s", actorIRI.String(), err)
}
} else if util.IsFollowingPath(actorIRI) {
if err := f.db.GetWhere([]db.Where{{Key: "following_uri", Value: actorIRI.String()}}, acct); err != nil {
return nil, fmt.Errorf("db error getting account with following uri %s: %s", actorIRI.String(), err)
return nil, fmt.Errorf("FOLLOWING: db error getting account with following uri %s: %s", actorIRI.String(), err)
}
} else {
return nil, fmt.Errorf("could not parse actor IRI %s as users or following path", actorIRI.String())
return nil, fmt.Errorf("FOLLOWING: could not parse actor IRI %s as users or following path", actorIRI.String())
}
acctFollowing := []gtsmodel.Follow{}
if err := f.db.GetFollowingByAccountID(acct.ID, &acctFollowing); err != nil {
return nil, fmt.Errorf("db error getting following for account id %s: %s", acct.ID, err)
return nil, fmt.Errorf("FOLLOWING: db error getting following for account id %s: %s", acct.ID, err)
}
following = streams.NewActivityStreamsCollection()
@ -51,11 +51,11 @@ func (f *federatingDB) Following(c context.Context, actorIRI *url.URL) (followin
for _, follow := range acctFollowing {
gtsFollowing := &gtsmodel.Account{}
if err := f.db.GetByID(follow.AccountID, gtsFollowing); err != nil {
return nil, fmt.Errorf("db error getting account id %s: %s", follow.AccountID, err)
return nil, fmt.Errorf("FOLLOWING: db error getting account id %s: %s", follow.AccountID, err)
}
uri, err := url.Parse(gtsFollowing.URI)
if err != nil {
return nil, fmt.Errorf("error parsing %s as url: %s", gtsFollowing.URI, err)
return nil, fmt.Errorf("FOLLOWING: error parsing %s as url: %s", gtsFollowing.URI, err)
}
items.AppendIRI(uri)
}

View File

@ -33,7 +33,9 @@ func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo)
targetAcctI := ctx.Value(util.APAccount)
if targetAcctI == nil {
l.Error("UNDO: target account wasn't set on context")
// If the target account wasn't set on the context, that means this request didn't pass through the
// API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
}
targetAcct, ok := targetAcctI.(*gtsmodel.Account)

View File

@ -41,31 +41,34 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
l.Debugf("received UPDATE asType %s", string(b))
receivingAcctI := ctx.Value(util.APAccount)
if receivingAcctI == nil {
l.Error("receiving account wasn't set on context")
targetAcctI := ctx.Value(util.APAccount)
if targetAcctI == nil {
// If the target account wasn't set on the context, that means this request didn't pass through the
// API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
}
receivingAcct, ok := receivingAcctI.(*gtsmodel.Account)
targetAcct, ok := targetAcctI.(*gtsmodel.Account)
if !ok {
l.Error("receiving account was set on context but couldn't be parsed")
l.Error("UPDATE: target account was set on context but couldn't be parsed")
}
requestingAcctI := ctx.Value(util.APRequestingAccount)
if receivingAcctI == nil {
l.Error("requesting account wasn't set on context")
if targetAcctI == nil {
l.Error("UPDATE: requesting account wasn't set on context")
}
requestingAcct, ok := requestingAcctI.(*gtsmodel.Account)
if !ok {
l.Error("requesting account was set on context but couldn't be parsed")
l.Error("UPDATE: requesting account was set on context but couldn't be parsed")
}
fromFederatorChanI := ctx.Value(util.APFromFederatorChanKey)
if fromFederatorChanI == nil {
l.Error("from federator channel wasn't set on context")
l.Error("UPDATE: from federator channel wasn't set on context")
}
fromFederatorChan, ok := fromFederatorChanI.(chan gtsmodel.FromFederator)
if !ok {
l.Error("from federator channel was set on context but couldn't be parsed")
l.Error("UPDATE: from federator channel was set on context but couldn't be parsed")
}
typeName := asType.GetTypeName()
@ -82,42 +85,42 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
l.Debug("got update for APPLICATION")
i, ok := asType.(vocab.ActivityStreamsApplication)
if !ok {
return errors.New("could not convert type to application")
return errors.New("UPDATE: could not convert type to application")
}
accountable = i
case gtsmodel.ActivityStreamsGroup:
l.Debug("got update for GROUP")
i, ok := asType.(vocab.ActivityStreamsGroup)
if !ok {
return errors.New("could not convert type to group")
return errors.New("UPDATE: could not convert type to group")
}
accountable = i
case gtsmodel.ActivityStreamsOrganization:
l.Debug("got update for ORGANIZATION")
i, ok := asType.(vocab.ActivityStreamsOrganization)
if !ok {
return errors.New("could not convert type to organization")
return errors.New("UPDATE: could not convert type to organization")
}
accountable = i
case gtsmodel.ActivityStreamsPerson:
l.Debug("got update for PERSON")
i, ok := asType.(vocab.ActivityStreamsPerson)
if !ok {
return errors.New("could not convert type to person")
return errors.New("UPDATE: could not convert type to person")
}
accountable = i
case gtsmodel.ActivityStreamsService:
l.Debug("got update for SERVICE")
i, ok := asType.(vocab.ActivityStreamsService)
if !ok {
return errors.New("could not convert type to service")
return errors.New("UPDATE: could not convert type to service")
}
accountable = i
}
updatedAcct, err := f.typeConverter.ASRepresentationToAccount(accountable, true)
if err != nil {
return fmt.Errorf("error converting to account: %s", err)
return fmt.Errorf("UPDATE: error converting to account: %s", err)
}
if updatedAcct.Domain == f.config.Host {
@ -127,19 +130,19 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
}
if requestingAcct.URI != updatedAcct.URI {
return fmt.Errorf("update for account %s was requested by account %s, this is not valid", updatedAcct.URI, requestingAcct.URI)
return fmt.Errorf("UPDATE: update for account %s was requested by account %s, this is not valid", updatedAcct.URI, requestingAcct.URI)
}
updatedAcct.ID = requestingAcct.ID // set this here so the db will update properly instead of trying to PUT this and getting constraint issues
if err := f.db.UpdateByID(requestingAcct.ID, updatedAcct); err != nil {
return fmt.Errorf("database error inserting updated account: %s", err)
return fmt.Errorf("UPDATE: database error inserting updated account: %s", err)
}
fromFederatorChan <- gtsmodel.FromFederator{
APObjectType: gtsmodel.ActivityStreamsProfile,
APActivityType: gtsmodel.ActivityStreamsUpdate,
GTSModel: updatedAcct,
ReceivingAccount: receivingAcct,
ReceivingAccount: targetAcct,
}
}