mirror of
1
Fork 0

[bugfix] in fedi API CreateStatus(), handle case of data-race and return early (#2403)

This commit is contained in:
kim 2023-12-01 10:53:53 +00:00 committed by GitHub
parent eb170003b8
commit d1cac53cbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 52 additions and 72 deletions

View File

@ -151,21 +151,62 @@ func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg messages.FromFe
func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) error { func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) error {
var ( var (
status *gtsmodel.Status status *gtsmodel.Status
err error statusable ap.Statusable
err error
) )
if fMsg.APObjectModel == nil /* i.e. forwarded */ { var ok bool
switch {
case fMsg.APObjectModel != nil:
// A model was provided, extract this from message.
statusable, ok = fMsg.APObjectModel.(ap.Statusable)
if !ok {
return gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObjectModel)
}
// Create bare-bones model to pass
// into RefreshStatus(), which it will
// further populate and insert as new.
bareStatus := new(gtsmodel.Status)
bareStatus.Local = util.Ptr(false)
bareStatus.URI = ap.GetJSONLDId(statusable).String()
// Call RefreshStatus() to parse and process the provided
// statusable model, which it will use to further flesh out
// the bare bones model and insert it into the database.
status, statusable, err = p.federate.RefreshStatus(ctx,
fMsg.ReceivingAccount.Username,
bareStatus,
statusable,
true,
)
if err != nil {
return gtserror.Newf("error processing new status %s: %w", bareStatus.URI, err)
}
case fMsg.APIri != nil:
// Model was not set, deref with IRI (this is a forward). // Model was not set, deref with IRI (this is a forward).
// This will also cause the status to be inserted into the db. // This will also cause the status to be inserted into the db.
status, err = p.statusFromAPIRI(ctx, fMsg) status, statusable, err = p.federate.GetStatusByURI(ctx,
} else { fMsg.ReceivingAccount.Username,
// Model is set, ensure we have the most up-to-date model. fMsg.APIri,
status, err = p.statusFromAPModel(ctx, fMsg) )
if err != nil {
return gtserror.Newf("error dereferencing forwarded status %s: %w", fMsg.APIri, err)
}
default:
return gtserror.New("neither APObjectModel nor APIri set")
} }
if err != nil { if statusable == nil {
return gtserror.Newf("error extracting status from federatorMsg: %w", err) // Another thread beat us to
// creating this status! Return
// here and let the other thread
// handle timelining + notifying.
return nil
} }
if status.InReplyToID != "" { if status.InReplyToID != "" {
@ -227,66 +268,6 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg messages.FromFediAPI)
return nil return nil
} }
func (p *fediAPI) statusFromAPModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) {
// AP statusable representation MUST have been set.
statusable, ok := fMsg.APObjectModel.(ap.Statusable)
if !ok {
return nil, gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObjectModel)
}
// Status may have been set (no problem if not).
status, _ := fMsg.GTSModel.(*gtsmodel.Status)
if status == nil {
// No status was set, create a bare-bones
// model for the deferencer to flesh-out,
// this indicates it is a new (to us) status.
status = &gtsmodel.Status{
// if coming in here status will ALWAYS be remote.
Local: util.Ptr(false),
URI: ap.GetJSONLDId(statusable).String(),
}
}
// Call refresh on status to either update existing
// model, or parse + insert status from statusable data.
status, _, err := p.federate.RefreshStatus(
ctx,
fMsg.ReceivingAccount.Username,
status,
statusable,
false, // Don't force refresh.
)
if err != nil {
return nil, gtserror.Newf("error refreshing status: %w", err)
}
return status, nil
}
func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) {
// There should be a status IRI pinned to
// the federatorMsg for us to dereference.
if fMsg.APIri == nil {
const text = "neither APObjectModel nor APIri set"
return nil, gtserror.New(text)
}
// Get the status + ensure we have
// the most up-to-date version.
status, _, err := p.federate.GetStatusByURI(
ctx,
fMsg.ReceivingAccount.Username,
fMsg.APIri,
)
if err != nil {
return nil, gtserror.Newf("error getting status by uri %s: %w", fMsg.APIri, err)
}
return status, nil
}
func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg messages.FromFediAPI) error { func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg messages.FromFediAPI) error {
followRequest, ok := fMsg.GTSModel.(*gtsmodel.FollowRequest) followRequest, ok := fMsg.GTSModel.(*gtsmodel.FollowRequest)
if !ok { if !ok {
@ -359,8 +340,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI)
// Dereference status that this boosts, note // Dereference status that this boosts, note
// that this will handle dereferencing the status // that this will handle dereferencing the status
// ancestors / descendants where appropriate. // ancestors / descendants where appropriate.
if err := p.federate.DereferenceAnnounce( if err := p.federate.DereferenceAnnounce(ctx,
ctx,
status, status,
fMsg.ReceivingAccount.Username, fMsg.ReceivingAccount.Username,
); err != nil { ); err != nil {
@ -388,7 +368,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI)
log.Errorf(ctx, "error notifying announce: %v", err) log.Errorf(ctx, "error notifying announce: %v", err)
} }
// Interaction counts changed on the boosted status; // Interaction counts changed on the original status;
// uncache the prepared version from all timelines. // uncache the prepared version from all timelines.
p.surface.invalidateStatusFromTimelines(ctx, status.ID) p.surface.invalidateStatusFromTimelines(ctx, status.ID)