diff --git a/integrations/issue_test.go b/integrations/issue_test.go index fe66a00504..1454d75885 100644 --- a/integrations/issue_test.go +++ b/integrations/issue_test.go @@ -11,8 +11,10 @@ import ( "strconv" "strings" "testing" + "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/references" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/test" @@ -87,7 +89,12 @@ func TestViewIssuesKeyword(t *testing.T) { defer prepareTestEnv(t)() repo := models.AssertExistsAndLoadBean(t, &models.Repository{ID: 1}).(*models.Repository) - + issue := models.AssertExistsAndLoadBean(t, &models.Issue{ + RepoID: repo.ID, + Index: 1, + }).(*models.Issue) + issues.UpdateIssueIndexer(issue) + time.Sleep(time.Second * 1) const keyword = "first" req := NewRequestf(t, "GET", "%s/issues?q=%s", repo.RelLink(), keyword) resp := MakeRequest(t, req, http.StatusOK) diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go index 787ff0dec5..b9f505e4bf 100644 --- a/modules/indexer/issues/bleve.go +++ b/modules/indexer/issues/bleve.go @@ -266,3 +266,8 @@ func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int) } return &ret, nil } + +// Close the Index +func (b *BleveIndexer) Close() error { + return b.indexer.Close() +} diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go index a758cfeaee..2a5df80fac 100644 --- a/modules/indexer/issues/db.go +++ b/modules/indexer/issues/db.go @@ -25,6 +25,11 @@ func (db *DBIndexer) Delete(ids ...int64) error { return nil } +// Close dummy function +func (db *DBIndexer) Close() error { + return nil +} + // Search dummy function func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) { total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index ebcd3f68dd..1fcef59f34 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -5,12 +5,16 @@ package issues import ( + "context" + "encoding/json" + "os" "sync" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" ) @@ -44,6 +48,7 @@ type Indexer interface { Index(issue []*IndexerData) error Delete(ids ...int64) error Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) + Close() error } type indexerHolder struct { @@ -75,9 +80,8 @@ func (h *indexerHolder) get() Indexer { } var ( - issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength) // issueIndexerQueue queue of issue ids to be updated - issueIndexerQueue Queue + issueIndexerQueue queue.Queue holder = newIndexerHolder() ) @@ -85,88 +89,142 @@ var ( // all issue index done. func InitIssueIndexer(syncReindex bool) { waitChannel := make(chan time.Duration) + + // Create the Queue + switch setting.Indexer.IssueType { + case "bleve": + handler := func(data ...queue.Data) { + iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber) + for _, datum := range data { + indexerData, ok := datum.(*IndexerData) + if !ok { + log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) + continue + } + log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete) + if indexerData.IsDelete { + _ = holder.get().Delete(indexerData.IDs...) + continue + } + iData = append(iData, indexerData) + } + if err := holder.get().Index(iData); err != nil { + log.Error("Error whilst indexing: %v Error: %v", iData, err) + } + } + + queueType := queue.PersistableChannelQueueType + switch setting.Indexer.IssueQueueType { + case setting.LevelQueueType: + queueType = queue.LevelQueueType + case setting.ChannelQueueType: + queueType = queue.PersistableChannelQueueType + case setting.RedisQueueType: + queueType = queue.RedisQueueType + default: + log.Fatal("Unsupported indexer queue type: %v", + setting.Indexer.IssueQueueType) + } + + name := "issue_indexer_queue" + opts := make(map[string]interface{}) + opts["QueueLength"] = setting.Indexer.UpdateQueueLength + opts["BatchLength"] = setting.Indexer.IssueQueueBatchNumber + opts["DataDir"] = setting.Indexer.IssueQueueDir + + addrs, password, dbIdx, err := setting.ParseQueueConnStr(setting.Indexer.IssueQueueConnStr) + if queueType == queue.RedisQueueType && err != nil { + log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v", + setting.Indexer.IssueQueueConnStr, + err) + } + opts["Addresses"] = addrs + opts["Password"] = password + opts["DBIndex"] = dbIdx + opts["QueueName"] = name + opts["Name"] = name + opts["Workers"] = 1 + opts["BlockTimeout"] = 1 * time.Second + opts["BoostTimeout"] = 5 * time.Minute + opts["BoostWorkers"] = 5 + cfg, err := json.Marshal(opts) + if err != nil { + log.Error("Unable to marshall generic options: %v Error: %v", opts, err) + log.Fatal("Unable to create issue indexer queue with type %s: %v", + queueType, + err) + } + log.Debug("Creating issue indexer queue with type %s: configuration: %s", queueType, string(cfg)) + issueIndexerQueue, err = queue.CreateQueue(queueType, handler, cfg, &IndexerData{}) + if err != nil { + issueIndexerQueue, err = queue.CreateQueue(queue.WrappedQueueType, handler, queue.WrappedQueueConfiguration{ + Underlying: queueType, + Timeout: setting.GracefulHammerTime + 30*time.Second, + MaxAttempts: 10, + Config: cfg, + QueueLength: setting.Indexer.UpdateQueueLength, + Name: name, + }, &IndexerData{}) + } + if err != nil { + log.Fatal("Unable to create issue indexer queue with type %s: %v : %v", + queueType, + string(cfg), + err) + } + default: + issueIndexerQueue = &queue.DummyQueue{} + } + + // Create the Indexer go func() { start := time.Now() - log.Info("Initializing Issue Indexer") + log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType) var populate bool - var dummyQueue bool switch setting.Indexer.IssueType { case "bleve": - issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() - if err != nil { - log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) - } - populate = !exist - holder.set(issueIndexer) + graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) { + issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) + } + populate = !exist + holder.set(issueIndexer) + atTerminate(context.Background(), func() { + log.Debug("Closing issue indexer") + issueIndexer := holder.get() + if issueIndexer != nil { + err := issueIndexer.Close() + if err != nil { + log.Error("Error whilst closing the issue indexer: %v", err) + } + } + log.Info("PID: %d Issue Indexer closed", os.Getpid()) + }) + log.Debug("Created Bleve Indexer") + }) case "db": issueIndexer := &DBIndexer{} holder.set(issueIndexer) - dummyQueue = true default: log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } - if dummyQueue { - issueIndexerQueue = &DummyQueue{} - } else { - var err error - switch setting.Indexer.IssueQueueType { - case setting.LevelQueueType: - issueIndexerQueue, err = NewLevelQueue( - holder.get(), - setting.Indexer.IssueQueueDir, - setting.Indexer.IssueQueueBatchNumber) - if err != nil { - log.Fatal( - "Unable create level queue for issue queue dir: %s batch number: %d : %v", - setting.Indexer.IssueQueueDir, - setting.Indexer.IssueQueueBatchNumber, - err) - } - case setting.ChannelQueueType: - issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber) - case setting.RedisQueueType: - addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) - if err != nil { - log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v", - setting.Indexer.IssueQueueConnStr, - err) - } - issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber) - if err != nil { - log.Fatal("Unable to create RedisQueue: %s : %v", - setting.Indexer.IssueQueueConnStr, - err) - } - default: - log.Fatal("Unsupported indexer queue type: %v", - setting.Indexer.IssueQueueType) - } - - go func() { - err = issueIndexerQueue.Run() - if err != nil { - log.Error("issueIndexerQueue.Run: %v", err) - } - }() - } - - go func() { - for data := range issueIndexerChannel { - _ = issueIndexerQueue.Push(data) - } - }() + // Start processing the queue + go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run) + // Populate the index if populate { if syncReindex { - populateIssueIndexer() + graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } else { - go populateIssueIndexer() + go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } } waitChannel <- time.Since(start) }() + if syncReindex { <-waitChannel } else if setting.Indexer.StartupTimeout > 0 { @@ -179,6 +237,9 @@ func InitIssueIndexer(syncReindex bool) { case duration := <-waitChannel: log.Info("Issue Indexer Initialization took %v", duration) case <-time.After(timeout): + if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok { + shutdownable.Terminate() + } log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout) } }() @@ -186,8 +247,14 @@ func InitIssueIndexer(syncReindex bool) { } // populateIssueIndexer populate the issue indexer with issue data -func populateIssueIndexer() { +func populateIssueIndexer(ctx context.Context) { for page := 1; ; page++ { + select { + case <-ctx.Done(): + log.Warn("Issue Indexer population shutdown before completion") + return + default: + } repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{ Page: page, PageSize: models.RepositoryListDefaultPageSize, @@ -200,10 +267,17 @@ func populateIssueIndexer() { continue } if len(repos) == 0 { + log.Debug("Issue Indexer population complete") return } for _, repo := range repos { + select { + case <-ctx.Done(): + log.Info("Issue Indexer population shutdown before completion") + return + default: + } UpdateRepoIndexer(repo) } } @@ -237,13 +311,17 @@ func UpdateIssueIndexer(issue *models.Issue) { comments = append(comments, comment.Content) } } - issueIndexerChannel <- &IndexerData{ + indexerData := &IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, Content: issue.Content, Comments: comments, } + log.Debug("Adding to channel: %v", indexerData) + if err := issueIndexerQueue.Push(indexerData); err != nil { + log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err) + } } // DeleteRepoIssueIndexer deletes repo's all issues indexes @@ -258,11 +336,13 @@ func DeleteRepoIssueIndexer(repo *models.Repository) { if len(ids) == 0 { return } - - issueIndexerChannel <- &IndexerData{ + indexerData := &IndexerData{ IDs: ids, IsDelete: true, } + if err := issueIndexerQueue.Push(indexerData); err != nil { + log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err) + } } // SearchIssuesByKeyword search issue ids by keywords and repo id diff --git a/modules/indexer/issues/queue.go b/modules/indexer/issues/queue.go deleted file mode 100644 index f93e5c47a4..0000000000 --- a/modules/indexer/issues/queue.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2018 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package issues - -// Queue defines an interface to save an issue indexer queue -type Queue interface { - Run() error - Push(*IndexerData) error -} - -// DummyQueue represents an empty queue -type DummyQueue struct { -} - -// Run starts to run the queue -func (b *DummyQueue) Run() error { - return nil -} - -// Push pushes data to indexer -func (b *DummyQueue) Push(*IndexerData) error { - return nil -} diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go deleted file mode 100644 index b6458d3eb5..0000000000 --- a/modules/indexer/issues/queue_channel.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2018 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package issues - -import ( - "time" - - "code.gitea.io/gitea/modules/setting" -) - -// ChannelQueue implements -type ChannelQueue struct { - queue chan *IndexerData - indexer Indexer - batchNumber int -} - -// NewChannelQueue create a memory channel queue -func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue { - return &ChannelQueue{ - queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength), - indexer: indexer, - batchNumber: batchNumber, - } -} - -// Run starts to run the queue -func (c *ChannelQueue) Run() error { - var i int - var datas = make([]*IndexerData, 0, c.batchNumber) - for { - select { - case data := <-c.queue: - if data.IsDelete { - _ = c.indexer.Delete(data.IDs...) - continue - } - - datas = append(datas, data) - if len(datas) >= c.batchNumber { - _ = c.indexer.Index(datas) - // TODO: save the point - datas = make([]*IndexerData, 0, c.batchNumber) - } - case <-time.After(time.Millisecond * 100): - i++ - if i >= 3 && len(datas) > 0 { - _ = c.indexer.Index(datas) - // TODO: save the point - datas = make([]*IndexerData, 0, c.batchNumber) - } - } - } -} - -// Push will push the indexer data to queue -func (c *ChannelQueue) Push(data *IndexerData) error { - c.queue <- data - return nil -} diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go deleted file mode 100644 index d6187f2acb..0000000000 --- a/modules/indexer/issues/queue_disk.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package issues - -import ( - "encoding/json" - "time" - - "code.gitea.io/gitea/modules/log" - - "gitea.com/lunny/levelqueue" -) - -var ( - _ Queue = &LevelQueue{} -) - -// LevelQueue implements a disk library queue -type LevelQueue struct { - indexer Indexer - queue *levelqueue.Queue - batchNumber int -} - -// NewLevelQueue creates a ledis local queue -func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) { - queue, err := levelqueue.Open(dataDir) - if err != nil { - return nil, err - } - - return &LevelQueue{ - indexer: indexer, - queue: queue, - batchNumber: batchNumber, - }, nil -} - -// Run starts to run the queue -func (l *LevelQueue) Run() error { - var i int - var datas = make([]*IndexerData, 0, l.batchNumber) - for { - i++ - if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) { - _ = l.indexer.Index(datas) - datas = make([]*IndexerData, 0, l.batchNumber) - i = 0 - continue - } - - bs, err := l.queue.RPop() - if err != nil { - if err != levelqueue.ErrNotFound { - log.Error("RPop: %v", err) - } - time.Sleep(time.Millisecond * 100) - continue - } - - if len(bs) == 0 { - time.Sleep(time.Millisecond * 100) - continue - } - - var data IndexerData - err = json.Unmarshal(bs, &data) - if err != nil { - log.Error("Unmarshal: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("LevelQueue: task found: %#v", data) - - if data.IsDelete { - if data.ID > 0 { - if err = l.indexer.Delete(data.ID); err != nil { - log.Error("indexer.Delete: %v", err) - } - } else if len(data.IDs) > 0 { - if err = l.indexer.Delete(data.IDs...); err != nil { - log.Error("indexer.Delete: %v", err) - } - } - time.Sleep(time.Millisecond * 10) - continue - } - - datas = append(datas, &data) - time.Sleep(time.Millisecond * 10) - } -} - -// Push will push the indexer data to queue -func (l *LevelQueue) Push(data *IndexerData) error { - bs, err := json.Marshal(data) - if err != nil { - return err - } - return l.queue.LPush(bs) -} diff --git a/modules/indexer/issues/queue_redis.go b/modules/indexer/issues/queue_redis.go deleted file mode 100644 index 0344d3c87a..0000000000 --- a/modules/indexer/issues/queue_redis.go +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package issues - -import ( - "encoding/json" - "errors" - "strconv" - "strings" - "time" - - "code.gitea.io/gitea/modules/log" - - "github.com/go-redis/redis" -) - -var ( - _ Queue = &RedisQueue{} -) - -type redisClient interface { - RPush(key string, args ...interface{}) *redis.IntCmd - LPop(key string) *redis.StringCmd - Ping() *redis.StatusCmd -} - -// RedisQueue redis queue -type RedisQueue struct { - client redisClient - queueName string - indexer Indexer - batchNumber int -} - -func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) { - fields := strings.Fields(connStr) - for _, f := range fields { - items := strings.SplitN(f, "=", 2) - if len(items) < 2 { - continue - } - switch strings.ToLower(items[0]) { - case "addrs": - addrs = items[1] - case "password": - password = items[1] - case "db": - dbIdx, err = strconv.Atoi(items[1]) - if err != nil { - return - } - } - } - return -} - -// NewRedisQueue creates single redis or cluster redis queue -func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, batchNumber int) (*RedisQueue, error) { - dbs := strings.Split(addrs, ",") - var queue = RedisQueue{ - queueName: "issue_indexer_queue", - indexer: indexer, - batchNumber: batchNumber, - } - if len(dbs) == 0 { - return nil, errors.New("no redis host found") - } else if len(dbs) == 1 { - queue.client = redis.NewClient(&redis.Options{ - Addr: strings.TrimSpace(dbs[0]), // use default Addr - Password: password, // no password set - DB: dbIdx, // use default DB - }) - } else { - queue.client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: dbs, - }) - } - if err := queue.client.Ping().Err(); err != nil { - return nil, err - } - return &queue, nil -} - -// Run runs the redis queue -func (r *RedisQueue) Run() error { - var i int - var datas = make([]*IndexerData, 0, r.batchNumber) - for { - bs, err := r.client.LPop(r.queueName).Bytes() - if err != nil && err != redis.Nil { - log.Error("LPop faile: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - - i++ - if len(datas) > r.batchNumber || (len(datas) > 0 && i > 3) { - _ = r.indexer.Index(datas) - datas = make([]*IndexerData, 0, r.batchNumber) - i = 0 - } - - if len(bs) == 0 { - time.Sleep(time.Millisecond * 100) - continue - } - - var data IndexerData - err = json.Unmarshal(bs, &data) - if err != nil { - log.Error("Unmarshal: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("RedisQueue: task found: %#v", data) - - if data.IsDelete { - if data.ID > 0 { - if err = r.indexer.Delete(data.ID); err != nil { - log.Error("indexer.Delete: %v", err) - } - } else if len(data.IDs) > 0 { - if err = r.indexer.Delete(data.IDs...); err != nil { - log.Error("indexer.Delete: %v", err) - } - } - time.Sleep(time.Millisecond * 100) - continue - } - - datas = append(datas, &data) - time.Sleep(time.Millisecond * 100) - } -} - -// Push implements Queue -func (r *RedisQueue) Push(data *IndexerData) error { - bs, err := json.Marshal(data) - if err != nil { - return err - } - return r.client.RPush(r.queueName, bs).Err() -}