From 3caae376e77a270f57733093163eafa3db8c71bc Mon Sep 17 00:00:00 2001 From: tobi <31960611+tsmethurst@users.noreply.github.com> Date: Mon, 22 Nov 2021 19:03:21 +0100 Subject: [PATCH] Fix streamed messages ending up in wrong timeline(s) (#325) * define timeline consts * remove double stream of status * change test stream creation up a bit * stream messages more selectively * add test for streaming new status creation via clientAPI * tidy code + comments a bit * tidy up tests * make sure new status isn't streamed to public --- internal/processing/fromclientapi_test.go | 118 ++++++++++++++++++ internal/processing/fromcommon.go | 27 ++-- internal/processing/fromfederator_test.go | 43 ++++--- internal/processing/streaming/notification.go | 2 +- internal/processing/streaming/openstream.go | 6 +- internal/processing/streaming/streamdelete.go | 2 +- internal/processing/streaming/streaming.go | 4 +- .../processing/streaming/streamtoaccount.go | 18 ++- internal/processing/streaming/update.go | 4 +- internal/stream/stream.go | 34 +++-- 10 files changed, 208 insertions(+), 50 deletions(-) create mode 100644 internal/processing/fromclientapi_test.go diff --git a/internal/processing/fromclientapi_test.go b/internal/processing/fromclientapi_test.go new file mode 100644 index 000000000..e1d4b1987 --- /dev/null +++ b/internal/processing/fromclientapi_test.go @@ -0,0 +1,118 @@ +/* + GoToSocial + Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +package processing_test + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/stream" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type FromClientAPITestSuite struct { + ProcessingStandardTestSuite +} + +func (suite *FromClientAPITestSuite) TestProcessStreamNewStatus() { + ctx := context.Background() + + // let's say that the admin account posts a new status: it should end up in the + // timeline of any account that follows it and has a stream open + postingAccount := suite.testAccounts["admin_account"] + receivingAccount := suite.testAccounts["local_account_1"] + + // open a home timeline stream for zork + wssStream, errWithCode := suite.processor.OpenStreamForAccount(ctx, receivingAccount, stream.TimelineHome) + suite.NoError(errWithCode) + + // open another stream for zork, but for a different timeline; + // this shouldn't get stuff streamed into it, since it's for the public timeline + irrelevantStream, errWithCode := suite.processor.OpenStreamForAccount(ctx, receivingAccount, stream.TimelinePublic) + suite.NoError(errWithCode) + + // make a new status from admin account + newStatus := >smodel.Status{ + ID: "01FN4B2F88TF9676DYNXWE1WSS", + URI: "http://localhost:8080/users/admin/statuses/01FN4B2F88TF9676DYNXWE1WSS", + URL: "http://localhost:8080/@admin/statuses/01FN4B2F88TF9676DYNXWE1WSS", + Content: "this status should stream :)", + AttachmentIDs: []string{}, + TagIDs: []string{}, + MentionIDs: []string{}, + EmojiIDs: []string{}, + CreatedAt: testrig.TimeMustParse("2021-10-20T11:36:45Z"), + UpdatedAt: testrig.TimeMustParse("2021-10-20T11:36:45Z"), + Local: true, + AccountURI: "http://localhost:8080/users/admin", + AccountID: "01F8MH17FWEB39HZJ76B6VXSKF", + InReplyToID: "", + BoostOfID: "", + ContentWarning: "", + Visibility: gtsmodel.VisibilityFollowersOnly, + Sensitive: false, + Language: "en", + CreatedWithApplicationID: "01F8MGXQRHYF5QPMTMXP78QC2F", + Federated: false, // set federated as false for this one, since we're not testing federation stuff now + Boostable: true, + Replyable: true, + Likeable: true, + ActivityStreamsType: ap.ObjectNote, + } + + // put the status in the db first, to mimic what would have already happened earlier up the flow + err := suite.db.PutStatus(ctx, newStatus) + suite.NoError(err) + + // process the new status + err = suite.processor.ProcessFromClientAPI(ctx, messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: newStatus, + OriginAccount: postingAccount, + }) + suite.NoError(err) + + // zork's stream should have the newly created status in it now + msg := <-wssStream.Messages + suite.Equal(stream.EventTypeUpdate, msg.Event) + suite.NotEmpty(msg.Payload) + suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) + statusStreamed := &model.Status{} + err = json.Unmarshal([]byte(msg.Payload), statusStreamed) + suite.NoError(err) + suite.Equal("01FN4B2F88TF9676DYNXWE1WSS", statusStreamed.ID) + suite.Equal("this status should stream :)", statusStreamed.Content) + + // and stream should now be empty + suite.Empty(wssStream.Messages) + + // the irrelevant messages stream should also be empty + suite.Empty(irrelevantStream.Messages) +} + +func TestFromClientAPITestSuite(t *testing.T) { + suite.Run(t, &FromClientAPITestSuite{}) +} diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go index 88f9994d4..ec45a3a57 100644 --- a/internal/processing/fromcommon.go +++ b/internal/processing/fromcommon.go @@ -27,6 +27,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/stream" ) func (p *processor) notifyStatus(ctx context.Context, status *gtsmodel.Status) error { @@ -328,6 +329,8 @@ func (p *processor) notifyAnnounce(ctx context.Context, status *gtsmodel.Status) return nil } +// timelineStatus processes the given new status and inserts it into +// the HOME timelines of accounts that follow the status author. func (p *processor) timelineStatus(ctx context.Context, status *gtsmodel.Status) error { // make sure the author account is pinned onto the status if status.Account == nil { @@ -376,14 +379,18 @@ func (p *processor) timelineStatus(ctx context.Context, status *gtsmodel.Status) close(errors) if len(errs) != 0 { - // we have some errors + // we have at least one error return fmt.Errorf("timelineStatus: one or more errors timelining statuses: %s", strings.Join(errs, ";")) } - // no errors, nice return nil } +// timelineStatusForAccount puts the given status in the HOME timeline +// of the account with given accountID, if it's hometimelineable. +// +// If the status was inserted into the home timeline of the given account, +// it will also be streamed via websockets to the user. func (p *processor) timelineStatusForAccount(ctx context.Context, status *gtsmodel.Status, accountID string, errors chan error, wg *sync.WaitGroup) { defer wg.Done() @@ -412,28 +419,22 @@ func (p *processor) timelineStatusForAccount(ctx context.Context, status *gtsmod return } - // the status was inserted to stream it to the user + // the status was inserted so stream it to the user if inserted { apiStatus, err := p.tc.StatusToAPIStatus(ctx, status, timelineAccount) if err != nil { errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err) - } else { - if err := p.streamingProcessor.StreamUpdateToAccount(apiStatus, timelineAccount); err != nil { - errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err) - } + return } - } - apiStatus, err := p.tc.StatusToAPIStatus(ctx, status, timelineAccount) - if err != nil { - errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err) - } else { - if err := p.streamingProcessor.StreamUpdateToAccount(apiStatus, timelineAccount); err != nil { + if err := p.streamingProcessor.StreamUpdateToAccount(apiStatus, timelineAccount, stream.TimelineHome); err != nil { errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err) } } } +// deleteStatusFromTimelines completely removes the given status from all timelines. +// It will also stream deletion of the status to all open streams. func (p *processor) deleteStatusFromTimelines(ctx context.Context, status *gtsmodel.Status) error { if err := p.timelineManager.WipeStatusFromAllTimelines(ctx, status.ID); err != nil { return err diff --git a/internal/processing/fromfederator_test.go b/internal/processing/fromfederator_test.go index 6846357d1..0351e5a43 100644 --- a/internal/processing/fromfederator_test.go +++ b/internal/processing/fromfederator_test.go @@ -32,6 +32,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/stream" "github.com/superseriousbusiness/gotosocial/testrig" ) @@ -115,6 +116,9 @@ func (suite *FromFederatorTestSuite) TestProcessReplyMention() { Likeable: true, } + wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), repliedAccount, stream.TimelineHome) + suite.NoError(errWithCode) + // id the status based on the time it was created statusID, err := id.NewULIDFromTime(replyingStatus.CreatedAt) suite.NoError(err) @@ -153,6 +157,17 @@ func (suite *FromFederatorTestSuite) TestProcessReplyMention() { suite.Equal(replyingStatus.AccountID, notif.OriginAccountID) suite.Equal(replyingStatus.ID, notif.StatusID) suite.False(notif.Read) + + // the notification should also be streamed + msg := <-wssStream.Messages + suite.Equal(stream.EventTypeNotification, msg.Event) + suite.NotEmpty(msg.Payload) + suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) + notifStreamed := &model.Notification{} + err = json.Unmarshal([]byte(msg.Payload), notifStreamed) + suite.NoError(err) + suite.Equal("mention", notifStreamed.Type) + suite.Equal(replyingAccount.ID, notifStreamed.Account.ID) } func (suite *FromFederatorTestSuite) TestProcessFave() { @@ -160,7 +175,7 @@ func (suite *FromFederatorTestSuite) TestProcessFave() { favedStatus := suite.testStatuses["local_account_1_status_1"] favingAccount := suite.testAccounts["remote_account_1"] - stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), favedAccount, "user") + wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), favedAccount, stream.TimelineNotifications) suite.NoError(errWithCode) fave := >smodel.StatusFave{ @@ -210,10 +225,10 @@ func (suite *FromFederatorTestSuite) TestProcessFave() { suite.False(notif.Read) // 2. a notification should be streamed - msg := <-stream.Messages - suite.Equal("notification", msg.Event) + msg := <-wssStream.Messages + suite.Equal(stream.EventTypeNotification, msg.Event) suite.NotEmpty(msg.Payload) - suite.EqualValues([]string{"user"}, msg.Stream) + suite.EqualValues([]string{stream.TimelineNotifications}, msg.Stream) } // TestProcessFaveWithDifferentReceivingAccount ensures that when an account receives a fave that's for @@ -227,7 +242,7 @@ func (suite *FromFederatorTestSuite) TestProcessFaveWithDifferentReceivingAccoun favedStatus := suite.testStatuses["local_account_1_status_1"] favingAccount := suite.testAccounts["remote_account_1"] - stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), receivingAccount, "user") + wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), receivingAccount, stream.TimelineHome) suite.NoError(errWithCode) fave := >smodel.StatusFave{ @@ -277,7 +292,7 @@ func (suite *FromFederatorTestSuite) TestProcessFaveWithDifferentReceivingAccoun suite.False(notif.Read) // 2. no notification should be streamed to the account that received the fave message, because they weren't the target - suite.Empty(stream.Messages) + suite.Empty(wssStream.Messages) } func (suite *FromFederatorTestSuite) TestProcessAccountDelete() { @@ -368,7 +383,7 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestLocked() { // target is a locked account targetAccount := suite.testAccounts["local_account_2"] - stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, "user") + wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, stream.TimelineHome) suite.NoError(errWithCode) // put the follow request in the database as though it had passed through the federating db already @@ -397,10 +412,10 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestLocked() { suite.NoError(err) // a notification should be streamed - msg := <-stream.Messages - suite.Equal("notification", msg.Event) + msg := <-wssStream.Messages + suite.Equal(stream.EventTypeNotification, msg.Event) suite.NotEmpty(msg.Payload) - suite.EqualValues([]string{"user"}, msg.Stream) + suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) notif := &model.Notification{} err = json.Unmarshal([]byte(msg.Payload), notif) suite.NoError(err) @@ -419,7 +434,7 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestUnlocked() { // target is an unlocked account targetAccount := suite.testAccounts["local_account_1"] - stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, "user") + wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, stream.TimelineHome) suite.NoError(errWithCode) // put the follow request in the database as though it had passed through the federating db already @@ -448,10 +463,10 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestUnlocked() { suite.NoError(err) // a notification should be streamed - msg := <-stream.Messages - suite.Equal("notification", msg.Event) + msg := <-wssStream.Messages + suite.Equal(stream.EventTypeNotification, msg.Event) suite.NotEmpty(msg.Payload) - suite.EqualValues([]string{"user"}, msg.Stream) + suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) notif := &model.Notification{} err = json.Unmarshal([]byte(msg.Payload), notif) suite.NoError(err) diff --git a/internal/processing/streaming/notification.go b/internal/processing/streaming/notification.go index 870490be4..7f8cfb8ac 100644 --- a/internal/processing/streaming/notification.go +++ b/internal/processing/streaming/notification.go @@ -33,5 +33,5 @@ func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, accoun return fmt.Errorf("error marshalling notification to json: %s", err) } - return p.streamToAccount(string(bytes), stream.EventTypeNotification, account.ID) + return p.streamToAccount(string(bytes), stream.EventTypeNotification, []string{stream.TimelineNotifications, stream.TimelineHome}, account.ID) } diff --git a/internal/processing/streaming/openstream.go b/internal/processing/streaming/openstream.go index 706cc0675..c256842a4 100644 --- a/internal/processing/streaming/openstream.go +++ b/internal/processing/streaming/openstream.go @@ -30,11 +30,11 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/stream" ) -func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) { +func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamTimeline string) (*stream.Stream, gtserror.WithCode) { l := logrus.WithFields(logrus.Fields{ "func": "OpenStreamForAccount", "account": account.ID, - "streamType": streamType, + "streamType": streamTimeline, }) l.Debug("received open stream request") @@ -46,7 +46,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel. thisStream := &stream.Stream{ ID: streamID, - Type: streamType, + Timeline: streamTimeline, Messages: make(chan *stream.Message, 100), Hangup: make(chan interface{}, 1), Connected: true, diff --git a/internal/processing/streaming/streamdelete.go b/internal/processing/streaming/streamdelete.go index 8332c37dc..6eb271bff 100644 --- a/internal/processing/streaming/streamdelete.go +++ b/internal/processing/streaming/streamdelete.go @@ -37,7 +37,7 @@ func (p *processor) StreamDelete(statusID string) error { // stream the delete to every account for _, accountID := range accountIDs { - if err := p.streamToAccount(statusID, stream.EventTypeDelete, accountID); err != nil { + if err := p.streamToAccount(statusID, stream.EventTypeDelete, stream.AllStatusTimelines, accountID); err != nil { errs = append(errs, err.Error()) } } diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go index 604ec1267..296c07f09 100644 --- a/internal/processing/streaming/streaming.go +++ b/internal/processing/streaming/streaming.go @@ -35,9 +35,9 @@ type Processor interface { // AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, error) // OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. - OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) + OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, timeline string) (*stream.Stream, gtserror.WithCode) // StreamUpdateToAccount streams the given update to any open, appropriate streams belonging to the given account. - StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account) error + StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error // StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account. StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error // StreamDelete streams the delete of the given statusID to *ALL* open streams. diff --git a/internal/processing/streaming/streamtoaccount.go b/internal/processing/streaming/streamtoaccount.go index 140910ab7..b950eecca 100644 --- a/internal/processing/streaming/streamtoaccount.go +++ b/internal/processing/streaming/streamtoaccount.go @@ -25,7 +25,7 @@ import ( ) // streamToAccount streams the given payload with the given event type to any streams currently open for the given account ID. -func (p *processor) streamToAccount(payload string, event stream.EventType, accountID string) error { +func (p *processor) streamToAccount(payload string, event string, timelines []string, accountID string) error { v, ok := p.streamMap.Load(accountID) if !ok { // no open connections so nothing to stream @@ -42,11 +42,17 @@ func (p *processor) streamToAccount(payload string, event stream.EventType, acco for _, s := range streamsForAccount.Streams { s.Lock() defer s.Unlock() - if s.Connected { - s.Messages <- &stream.Message{ - Stream: []string{s.Type}, - Event: string(event), - Payload: payload, + if !s.Connected { + continue + } + + for _, t := range timelines { + if s.Timeline == string(t) { + s.Messages <- &stream.Message{ + Stream: []string{string(t)}, + Event: string(event), + Payload: payload, + } } } } diff --git a/internal/processing/streaming/update.go b/internal/processing/streaming/update.go index da7dcb6ce..bd7bb0b12 100644 --- a/internal/processing/streaming/update.go +++ b/internal/processing/streaming/update.go @@ -27,11 +27,11 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/stream" ) -func (p *processor) StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account) error { +func (p *processor) StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error { bytes, err := json.Marshal(s) if err != nil { return fmt.Errorf("error marshalling status to json: %s", err) } - return p.streamToAccount(string(bytes), stream.EventTypeUpdate, account.ID) + return p.streamToAccount(string(bytes), stream.EventTypeUpdate, []string{timeline}, account.ID) } diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 011b7dbe1..e92d610d8 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -2,18 +2,36 @@ package stream import "sync" -// EventType models a type of stream event. -type EventType string - const ( // EventTypeNotification -- a user should be shown a notification - EventTypeNotification EventType = "notification" + EventTypeNotification string = "notification" // EventTypeUpdate -- a user should be shown an update in their timeline - EventTypeUpdate EventType = "update" + EventTypeUpdate string = "update" // EventTypeDelete -- something should be deleted from a user - EventTypeDelete EventType = "delete" + EventTypeDelete string = "delete" ) +const ( + // TimelineLocal -- public statuses from the LOCAL timeline. + TimelineLocal string = "public:local" + // TimelinePublic -- public statuses, including federated ones. + TimelinePublic string = "public" + // TimelineHome -- statuses for a user's Home timeline. + TimelineHome string = "user" + // TimelineNotifications -- notification events. + TimelineNotifications string = "user:notification" + // TimelineDirect -- statuses sent to a user directly. + TimelineDirect string = "direct" +) + +// AllStatusTimelines contains all Timelines that a status could conceivably be delivered to -- useful for doing deletes. +var AllStatusTimelines = []string{ + TimelineLocal, + TimelinePublic, + TimelineHome, + TimelineDirect, +} + // StreamsForAccount is a wrapper for the multiple streams that one account can have running at the same time. // TODO: put a limit on this type StreamsForAccount struct { @@ -27,8 +45,8 @@ type StreamsForAccount struct { type Stream struct { // ID of this stream, generated during creation. ID string - // Type of this stream: user/public/etc - Type string + // Timeline of this stream: user/public/etc + Timeline string // Channel of messages for the client to read from Messages chan *Message // Channel to close when the client drops away