Skip to content

Commit 49396ba

Browse files
committed
Automatically pause queue if issue index service is unavailable
1 parent 80adbeb commit 49396ba

File tree

4 files changed

+125
-13
lines changed

4 files changed

+125
-13
lines changed

modules/indexer/issues/bleve.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,15 @@ func (b *BleveIndexer) Init() (bool, error) {
186186
return false, err
187187
}
188188

189+
// SetAvailabilityChangeCallback does nothing
190+
func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
191+
}
192+
193+
// Ping does nothing
194+
func (b *BleveIndexer) Ping() bool {
195+
return true
196+
}
197+
189198
// Close will close the bleve indexer
190199
func (b *BleveIndexer) Close() {
191200
if b.indexer != nil {

modules/indexer/issues/db.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@ func (db *DBIndexer) Init() (bool, error) {
1414
return false, nil
1515
}
1616

17+
// SetAvailabilityChangeCallback dummy function
18+
func (db *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
19+
}
20+
21+
// Ping checks if database is available
22+
func (db *DBIndexer) Ping() bool {
23+
return models.Ping() != nil
24+
}
25+
1726
// Index dummy function
1827
func (db *DBIndexer) Index(issue []*IndexerData) error {
1928
return nil

modules/indexer/issues/elastic_search.go

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ var _ Indexer = &ElasticSearchIndexer{}
2020

2121
// ElasticSearchIndexer implements Indexer interface
2222
type ElasticSearchIndexer struct {
23-
client *elastic.Client
24-
indexerName string
23+
client *elastic.Client
24+
indexerName string
25+
available bool
26+
availabilityCallback func(bool)
27+
stopTimer chan struct{}
2528
}
2629

2730
type elasticLogger struct {
@@ -56,10 +59,27 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, er
5659
return nil, err
5760
}
5861

59-
return &ElasticSearchIndexer{
62+
indexer := &ElasticSearchIndexer{
6063
client: client,
6164
indexerName: indexerName,
62-
}, nil
65+
available: true,
66+
stopTimer: make(chan struct{}),
67+
}
68+
69+
ticker := time.NewTicker(10 * time.Second)
70+
go func() {
71+
for {
72+
select {
73+
case <-ticker.C:
74+
indexer.checkAvailability()
75+
case <-indexer.stopTimer:
76+
ticker.Stop()
77+
return
78+
}
79+
}
80+
}()
81+
82+
return indexer, nil
6383
}
6484

6585
const (
@@ -96,15 +116,15 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
96116
ctx := context.Background()
97117
exists, err := b.client.IndexExists(b.indexerName).Do(ctx)
98118
if err != nil {
99-
return false, err
119+
return false, b.checkError(err)
100120
}
101121

102122
if !exists {
103123
mapping := defaultMapping
104124

105125
createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx)
106126
if err != nil {
107-
return false, err
127+
return false, b.checkError(err)
108128
}
109129
if !createIndex.Acknowledged {
110130
return false, errors.New("init failed")
@@ -115,6 +135,16 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
115135
return true, nil
116136
}
117137

138+
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
139+
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
140+
b.availabilityCallback = callback
141+
}
142+
143+
// Ping checks if elastic is available
144+
func (b *ElasticSearchIndexer) Ping() bool {
145+
return b.available
146+
}
147+
118148
// Index will save the index data
119149
func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
120150
if len(issues) == 0 {
@@ -132,7 +162,7 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
132162
"comments": issue.Comments,
133163
}).
134164
Do(context.Background())
135-
return err
165+
return b.checkError(err)
136166
}
137167

138168
reqs := make([]elastic.BulkableRequest, 0)
@@ -155,7 +185,7 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
155185
Index(b.indexerName).
156186
Add(reqs...).
157187
Do(context.Background())
158-
return err
188+
return b.checkError(err)
159189
}
160190

161191
// Delete deletes indexes by ids
@@ -167,7 +197,7 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
167197
Index(b.indexerName).
168198
Id(fmt.Sprintf("%d", ids[0])).
169199
Do(context.Background())
170-
return err
200+
return b.checkError(err)
171201
}
172202

173203
reqs := make([]elastic.BulkableRequest, 0)
@@ -183,7 +213,7 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
183213
Index(b.indexerName).
184214
Add(reqs...).
185215
Do(context.Background())
186-
return err
216+
return b.checkError(err)
187217
}
188218

189219
// Search searches for issues by given conditions.
@@ -207,7 +237,7 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
207237
From(start).Size(limit).
208238
Do(context.Background())
209239
if err != nil {
210-
return nil, err
240+
return nil, b.checkError(err)
211241
}
212242

213243
hits := make([]Match, 0, limit)
@@ -225,4 +255,32 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
225255
}
226256

227257
// Close implements indexer
228-
func (b *ElasticSearchIndexer) Close() {}
258+
func (b *ElasticSearchIndexer) Close() {
259+
close(b.stopTimer)
260+
}
261+
262+
func (b *ElasticSearchIndexer) checkError(err error) error {
263+
if elastic.IsConnErr(err) && b.available {
264+
b.available = false
265+
if b.availabilityCallback != nil {
266+
b.availabilityCallback(b.available)
267+
}
268+
}
269+
return err
270+
}
271+
272+
func (b *ElasticSearchIndexer) checkAvailability() {
273+
if b.available {
274+
return
275+
}
276+
277+
// Request cluster state to check if elastic is available again
278+
_, err := b.client.ClusterState().Do(context.Background())
279+
if err != nil {
280+
return
281+
}
282+
b.available = true
283+
if b.availabilityCallback != nil {
284+
b.availabilityCallback(b.available)
285+
}
286+
}

modules/indexer/issues/indexer.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ type SearchResult struct {
4747
// Indexer defines an interface to indexer issues contents
4848
type Indexer interface {
4949
Init() (bool, error)
50+
Ping() bool
51+
SetAvailabilityChangeCallback(callback func(bool))
5052
Index(issue []*IndexerData) error
5153
Delete(ids ...int64) error
5254
Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
@@ -111,6 +113,7 @@ func InitIssueIndexer(syncReindex bool) {
111113
}
112114

113115
iData := make([]*IndexerData, 0, len(data))
116+
unhandled := make([]queue.Data, 0, len(data))
114117
for _, datum := range data {
115118
indexerData, ok := datum.(*IndexerData)
116119
if !ok {
@@ -119,13 +122,34 @@ func InitIssueIndexer(syncReindex bool) {
119122
}
120123
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
121124
if indexerData.IsDelete {
122-
_ = indexer.Delete(indexerData.IDs...)
125+
if err := indexer.Delete(indexerData.IDs...); err != nil {
126+
log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err)
127+
if indexer.Ping() {
128+
continue
129+
}
130+
// Add back to queue
131+
unhandled = append(unhandled, datum)
132+
}
123133
continue
124134
}
125135
iData = append(iData, indexerData)
126136
}
137+
if len(unhandled) > 0 {
138+
for _, indexerData := range iData {
139+
unhandled = append(unhandled, indexerData)
140+
}
141+
return unhandled
142+
}
127143
if err := indexer.Index(iData); err != nil {
128144
log.Error("Error whilst indexing: %v Error: %v", iData, err)
145+
if indexer.Ping() {
146+
return nil
147+
}
148+
// Add back to queue
149+
for _, indexerData := range iData {
150+
unhandled = append(unhandled, indexerData)
151+
}
152+
return unhandled
129153
}
130154
return nil
131155
}
@@ -193,6 +217,18 @@ func InitIssueIndexer(syncReindex bool) {
193217
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
194218
}
195219

220+
if queue, ok := issueIndexerQueue.(queue.Pausable); ok {
221+
holder.get().SetAvailabilityChangeCallback(func(available bool) {
222+
if !available {
223+
log.Info("Issue index queue paused")
224+
queue.Pause()
225+
} else {
226+
log.Info("Issue index queue resumed")
227+
queue.Resume()
228+
}
229+
})
230+
}
231+
196232
// Start processing the queue
197233
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
198234

0 commit comments

Comments
 (0)