Skip to content

Commit f751693

Browse files
committed
Prevent deadlock during early shutdown of issue indexer
1 parent ba1698a commit f751693

File tree

1 file changed

+37
-8
lines changed

1 file changed

+37
-8
lines changed

modules/indexer/issues/indexer.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package issues
66

77
import (
88
"context"
9+
"fmt"
910
"os"
1011
"sync"
1112
"time"
@@ -51,9 +52,10 @@ type Indexer interface {
5152
}
5253

5354
type indexerHolder struct {
54-
indexer Indexer
55-
mutex sync.RWMutex
56-
cond *sync.Cond
55+
indexer Indexer
56+
mutex sync.RWMutex
57+
cond *sync.Cond
58+
cancelled bool
5759
}
5860

5961
func newIndexerHolder() *indexerHolder {
@@ -62,6 +64,13 @@ func newIndexerHolder() *indexerHolder {
6264
return h
6365
}
6466

67+
func (h *indexerHolder) cancel() {
68+
h.mutex.Lock()
69+
defer h.mutex.Unlock()
70+
h.cancelled = true
71+
h.cond.Broadcast()
72+
}
73+
6574
func (h *indexerHolder) set(indexer Indexer) {
6675
h.mutex.Lock()
6776
defer h.mutex.Unlock()
@@ -72,7 +81,7 @@ func (h *indexerHolder) set(indexer Indexer) {
7281
func (h *indexerHolder) get() Indexer {
7382
h.mutex.RLock()
7483
defer h.mutex.RUnlock()
75-
if h.indexer == nil {
84+
if h.indexer == nil && !h.cancelled {
7685
h.cond.Wait()
7786
}
7887
return h.indexer
@@ -93,6 +102,12 @@ func InitIssueIndexer(syncReindex bool) {
93102
switch setting.Indexer.IssueType {
94103
case "bleve":
95104
handler := func(data ...queue.Data) {
105+
indexer := holder.get()
106+
if indexer == nil {
107+
log.Error("Unable to get indexer!")
108+
return
109+
}
110+
96111
iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber)
97112
for _, datum := range data {
98113
indexerData, ok := datum.(*IndexerData)
@@ -102,12 +117,12 @@ func InitIssueIndexer(syncReindex bool) {
102117
}
103118
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
104119
if indexerData.IsDelete {
105-
_ = holder.get().Delete(indexerData.IDs...)
120+
_ = indexer.Delete(indexerData.IDs...)
106121
continue
107122
}
108123
iData = append(iData, indexerData)
109124
}
110-
if err := holder.get().Index(iData); err != nil {
125+
if err := indexer.Index(iData); err != nil {
111126
log.Error("Error whilst indexing: %v Error: %v", iData, err)
112127
}
113128
}
@@ -132,6 +147,7 @@ func InitIssueIndexer(syncReindex bool) {
132147
issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
133148
exist, err := issueIndexer.Init()
134149
if err != nil {
150+
holder.cancel()
135151
log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
136152
}
137153
populate = !exist
@@ -153,6 +169,7 @@ func InitIssueIndexer(syncReindex bool) {
153169
issueIndexer := &DBIndexer{}
154170
holder.set(issueIndexer)
155171
default:
172+
holder.cancel()
156173
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
157174
}
158175

@@ -168,10 +185,14 @@ func InitIssueIndexer(syncReindex bool) {
168185
}
169186
}
170187
waitChannel <- time.Since(start)
188+
close(waitChannel)
171189
}()
172190

173191
if syncReindex {
174-
<-waitChannel
192+
select {
193+
case <-waitChannel:
194+
case <-graceful.GetManager().IsShutdown():
195+
}
175196
} else if setting.Indexer.StartupTimeout > 0 {
176197
go func() {
177198
timeout := setting.Indexer.StartupTimeout
@@ -181,6 +202,8 @@ func InitIssueIndexer(syncReindex bool) {
181202
select {
182203
case duration := <-waitChannel:
183204
log.Info("Issue Indexer Initialization took %v", duration)
205+
case <-graceful.GetManager().IsShutdown():
206+
log.Warn("Shutdown occurred before issue index initialisation was complete")
184207
case <-time.After(timeout):
185208
if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
186209
shutdownable.Terminate()
@@ -293,7 +316,13 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
293316
// SearchIssuesByKeyword search issue ids by keywords and repo id
294317
func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
295318
var issueIDs []int64
296-
res, err := holder.get().Search(keyword, repoIDs, 1000, 0)
319+
indexer := holder.get()
320+
321+
if indexer == nil {
322+
log.Error("Unable to get indexer!")
323+
return nil, fmt.Errorf("unable to get issue indexer")
324+
}
325+
res, err := indexer.Search(keyword, repoIDs, 1000, 0)
297326
if err != nil {
298327
return nil, err
299328
}

0 commit comments

Comments
 (0)