From f034ee6cf0ab0806ef0b8edb5f38e6a5c37f2f7c Mon Sep 17 00:00:00 2001
From: 6543 <6543@obermui.de>
Date: Wed, 4 May 2022 18:06:23 +0200
Subject: [PATCH] PullService lock via pullID (#19520)

* lock pull on git&db actions ...

* add TODO notes

* rename prQueue 2 prPatchCheckerQueue

* fmt
---
 services/pull/check.go          | 2 ++
 services/pull/merge.go          | 7 ++++++-
 services/pull/pull.go           | 7 +++++++
 services/pull/update.go         | 3 +++
 services/repository/transfer.go | 1 +
 services/wiki/wiki.go           | 3 ++-
 6 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/services/pull/check.go b/services/pull/check.go
index 64eb93104e..6852940b22 100644
--- a/services/pull/check.go
+++ b/services/pull/check.go
@@ -317,6 +317,8 @@ func handle(data ...queue.Data) []queue.Data {
 }
 
 func testPR(id int64) {
+	pullWorkingPool.CheckIn(fmt.Sprint(id))
+	defer pullWorkingPool.CheckOut(fmt.Sprint(id))
 	ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id))
 	defer finished()
 
diff --git a/services/pull/merge.go b/services/pull/merge.go
index ad93b9779f..fe295cbe03 100644
--- a/services/pull/merge.go
+++ b/services/pull/merge.go
@@ -34,7 +34,6 @@ import (
 
 // Merge merges pull request to base repository.
 // Caller should check PR is ready to be merged (review and status checks)
-// FIXME: add repoWorkingPull make sure two merges does not happen at same time.
 func Merge(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, mergeStyle repo_model.MergeStyle, expectedHeadCommitID, message string) error {
 	if err := pr.LoadHeadRepo(); err != nil {
 		log.Error("LoadHeadRepo: %v", err)
@@ -44,6 +43,9 @@ func Merge(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repos
 		return fmt.Errorf("LoadBaseRepo: %v", err)
 	}
 
+	pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
+	defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
+
 	prUnit, err := pr.BaseRepo.GetUnit(unit.TypePullRequests)
 	if err != nil {
 		log.Error("pr.BaseRepo.GetUnit(unit.TypePullRequests): %v", err)
@@ -726,6 +728,9 @@ func CheckPullBranchProtections(ctx context.Context, pr *models.PullRequest, ski
 
 // MergedManually mark pr as merged manually
 func MergedManually(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error {
+	pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
+	defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
+
 	if err := db.WithTx(func(ctx context.Context) error {
 		prUnit, err := pr.BaseRepo.GetUnitCtx(ctx, unit.TypePullRequests)
 		if err != nil {
diff --git a/services/pull/pull.go b/services/pull/pull.go
index 0d10a23681..5cef3c356f 100644
--- a/services/pull/pull.go
+++ b/services/pull/pull.go
@@ -25,9 +25,13 @@ import (
 	"code.gitea.io/gitea/modules/notification"
 	"code.gitea.io/gitea/modules/process"
 	"code.gitea.io/gitea/modules/setting"
+	"code.gitea.io/gitea/modules/sync"
 	issue_service "code.gitea.io/gitea/services/issue"
 )
 
+// TODO: use clustered lock (unique queue? or *abuse* cache)
+var pullWorkingPool = sync.NewExclusivePool()
+
 // NewPullRequest creates new pull request with labels for repository.
 func NewPullRequest(ctx context.Context, repo *repo_model.Repository, pull *models.Issue, labelIDs []int64, uuids []string, pr *models.PullRequest, assigneeIDs []int64) error {
 	if err := TestPatch(pr); err != nil {
@@ -124,6 +128,9 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, pull *mode
 
 // ChangeTargetBranch changes the target branch of this pull request, as the given user.
 func ChangeTargetBranch(ctx context.Context, pr *models.PullRequest, doer *user_model.User, targetBranch string) (err error) {
+	pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
+	defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
+
 	// Current target branch is already the same
 	if pr.BaseBranch == targetBranch {
 		return nil
diff --git a/services/pull/update.go b/services/pull/update.go
index 08ff1cedfc..08967b59b3 100644
--- a/services/pull/update.go
+++ b/services/pull/update.go
@@ -23,6 +23,9 @@ func Update(ctx context.Context, pull *models.PullRequest, doer *user_model.User
 		style repo_model.MergeStyle
 	)
 
+	pullWorkingPool.CheckIn(fmt.Sprint(pull.ID))
+	defer pullWorkingPool.CheckOut(fmt.Sprint(pull.ID))
+
 	if rebase {
 		pr = pull
 		style = repo_model.MergeStyleRebaseUpdate
diff --git a/services/repository/transfer.go b/services/repository/transfer.go
index 0abb03a88d..3feeb68f22 100644
--- a/services/repository/transfer.go
+++ b/services/repository/transfer.go
@@ -19,6 +19,7 @@ import (
 )
 
 // repoWorkingPool represents a working pool to order the parallel changes to the same repository
+// TODO: use clustered lock (unique queue? or *abuse* cache)
 var repoWorkingPool = sync.NewExclusivePool()
 
 // TransferOwnership transfers all corresponding setting from old user to new one.
diff --git a/services/wiki/wiki.go b/services/wiki/wiki.go
index 454f54983c..796291fd38 100644
--- a/services/wiki/wiki.go
+++ b/services/wiki/wiki.go
@@ -27,7 +27,8 @@ import (
 
 var (
 	reservedWikiNames = []string{"_pages", "_new", "_edit", "raw"}
-	wikiWorkingPool   = sync.NewExclusivePool()
+	// TODO: use clustered lock (unique queue? or *abuse* cache)
+	wikiWorkingPool = sync.NewExclusivePool()
 )
 
 func nameAllowed(name string) error {