Skip to content

Commit 3b28eef

Browse files
committed
Implement automatic disabling and resume of code indexer queue
1 parent 3acdb13 commit 3b28eef

File tree

12 files changed

+250
-72
lines changed

12 files changed

+250
-72
lines changed

docs/content/doc/developers/hacking-on-gitea.en-us.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,27 @@ make lint-frontend
187187

188188
Note: When working on frontend code, set `USE_SERVICE_WORKER` to `false` in `app.ini` to prevent undesirable caching of frontend assets.
189189

190+
### Configuring local ElasticSearch instance
191+
192+
Start local ElasticSearch instance using docker:
193+
194+
```sh
195+
mkdir -p $(pwd)/data/elasticsearch
196+
sudo chown -R 1000:1000 $(pwd)/data/elasticsearch
197+
docker run --rm -p 127.0.0.1:9200:9200 -p 127.0.0.1:9300:9300 -e "discovery.type=single-node" -v "$(pwd)/data/elasticsearch:/usr/share/elasticsearch/data" docker.elastic.co/elasticsearch/elasticsearch:7.16.3
198+
```
199+
200+
Configure `app.ini`:
201+
202+
```ini
203+
[indexer]
204+
ISSUE_INDEXER_TYPE = elasticsearch
205+
ISSUE_INDEXER_CONN_STR = http://elastic:changeme@localhost:9200
206+
REPO_INDEXER_ENABLED = true
207+
REPO_INDEXER_TYPE = elasticsearch
208+
REPO_INDEXER_CONN_STR = http://elastic:changeme@localhost:9200
209+
```
210+
190211
### Building and adding SVGs
191212

192213
SVG icons are built using the `make svg` target which compiles the icon sources defined in `build/generate-svg.js` into the output directory `public/img/svg`. Custom icons can be added in the `web_src/svg` directory.

modules/context/repo.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
user_model "code.gitea.io/gitea/models/user"
2222
"code.gitea.io/gitea/modules/cache"
2323
"code.gitea.io/gitea/modules/git"
24+
code_indexer "code.gitea.io/gitea/modules/indexer/code"
2425
"code.gitea.io/gitea/modules/log"
2526
"code.gitea.io/gitea/modules/markup/markdown"
2627
"code.gitea.io/gitea/modules/setting"
@@ -522,6 +523,9 @@ func RepoAssignment(ctx *Context) (cancel context.CancelFunc) {
522523
ctx.Data["ExposeAnonSSH"] = setting.SSH.ExposeAnonymous
523524
ctx.Data["DisableHTTP"] = setting.Repository.DisableHTTPGit
524525
ctx.Data["RepoSearchEnabled"] = setting.Indexer.RepoIndexerEnabled
526+
if setting.Indexer.RepoIndexerEnabled {
527+
ctx.Data["CodeIndexerUnavailable"] = !code_indexer.IsAvailable()
528+
}
525529
ctx.Data["CloneLink"] = repo.CloneLink()
526530
ctx.Data["UncycloCloneLink"] = repo.UncycloCloneLink()
527531

modules/indexer/code/bleve.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,15 @@ func (b *BleveIndexer) Close() {
271271
log.Info("PID: %d Repository Indexer closed", os.Getpid())
272272
}
273273

274+
// SetAvailabilityChangeCallback does nothing
275+
func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
276+
}
277+
278+
// Ping does nothing
279+
func (b *BleveIndexer) Ping() bool {
280+
return true
281+
}
282+
274283
// Index indexes the data
275284
func (b *BleveIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error {
276285
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)

modules/indexer/code/elastic_search.go

Lines changed: 76 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ package code
77
import (
88
"bufio"
99
"context"
10+
"errors"
1011
"fmt"
1112
"io"
13+
"net"
1214
"strconv"
1315
"strings"
1416
"time"
@@ -39,8 +41,11 @@ var _ Indexer = &ElasticSearchIndexer{}
3941

4042
// ElasticSearchIndexer implements Indexer interface
4143
type ElasticSearchIndexer struct {
42-
client *elastic.Client
43-
indexerAliasName string
44+
client *elastic.Client
45+
indexerAliasName string
46+
available bool
47+
availabilityCallback func(bool)
48+
stopTimer chan struct{}
4449
}
4550

4651
type elasticLogger struct {
@@ -78,7 +83,23 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bo
7883
indexer := &ElasticSearchIndexer{
7984
client: client,
8085
indexerAliasName: indexerName,
86+
available: true,
87+
stopTimer: make(chan struct{}),
8188
}
89+
90+
ticker := time.NewTicker(10 * time.Second)
91+
go func() {
92+
for {
93+
select {
94+
case <-ticker.C:
95+
indexer.checkAvailability()
96+
case <-indexer.stopTimer:
97+
ticker.Stop()
98+
return
99+
}
100+
}
101+
}()
102+
82103
exists, err := indexer.init()
83104
if err != nil {
84105
indexer.Close()
@@ -126,14 +147,14 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
126147
ctx := context.Background()
127148
exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx)
128149
if err != nil {
129-
return false, err
150+
return false, b.checkError(err)
130151
}
131152
if !exists {
132153
mapping := defaultMapping
133154

134155
createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx)
135156
if err != nil {
136-
return false, err
157+
return false, b.checkError(err)
137158
}
138159
if !createIndex.Acknowledged {
139160
return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping)
@@ -143,7 +164,7 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
143164
// check version
144165
r, err := b.client.Aliases().Do(ctx)
145166
if err != nil {
146-
return false, err
167+
return false, b.checkError(err)
147168
}
148169

149170
realIndexerNames := r.IndicesByAlias(b.indexerAliasName)
@@ -152,10 +173,10 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
152173
Add(b.realIndexerName(), b.indexerAliasName).
153174
Do(ctx)
154175
if err != nil {
155-
return false, err
176+
return false, b.checkError(err)
156177
}
157178
if !res.Acknowledged {
158-
return false, fmt.Errorf("")
179+
return false, fmt.Errorf("create alias %s to index %s failed", b.indexerAliasName, b.realIndexerName())
159180
}
160181
} else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() {
161182
log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.",
@@ -165,16 +186,26 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
165186
Add(b.realIndexerName(), b.indexerAliasName).
166187
Do(ctx)
167188
if err != nil {
168-
return false, err
189+
return false, b.checkError(err)
169190
}
170191
if !res.Acknowledged {
171-
return false, fmt.Errorf("")
192+
return false, fmt.Errorf("change alias %s to index %s failed", b.indexerAliasName, b.realIndexerName())
172193
}
173194
}
174195

175196
return exists, nil
176197
}
177198

199+
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
200+
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
201+
b.availabilityCallback = callback
202+
}
203+
204+
// Ping checks if elastic is available
205+
func (b *ElasticSearchIndexer) Ping() bool {
206+
return b.available
207+
}
208+
178209
func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update fileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) {
179210
// Ignore vendored files in code search
180211
if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
@@ -190,7 +221,7 @@ func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.Wr
190221
return nil, err
191222
}
192223
if size, err = strconv.ParseInt(strings.TrimSpace(stdout), 10, 64); err != nil {
193-
return nil, fmt.Errorf("Misformatted git cat-file output: %v", err)
224+
return nil, fmt.Errorf("misformatted git cat-file output: %v", err)
194225
}
195226
}
196227

@@ -275,7 +306,7 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos
275306
Index(b.indexerAliasName).
276307
Add(reqs...).
277308
Do(context.Background())
278-
return err
309+
return b.checkError(err)
279310
}
280311
return nil
281312
}
@@ -285,7 +316,7 @@ func (b *ElasticSearchIndexer) Delete(repoID int64) error {
285316
_, err := b.client.DeleteByQuery(b.indexerAliasName).
286317
Query(elastic.NewTermsQuery("repo_id", repoID)).
287318
Do(context.Background())
288-
return err
319+
return b.checkError(err)
289320
}
290321

291322
// indexPos find words positions for start and the following end on content. It will
@@ -409,7 +440,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
409440
From(start).Size(pageSize).
410441
Do(context.Background())
411442
if err != nil {
412-
return 0, nil, nil, err
443+
return 0, nil, nil, b.checkError(err)
413444
}
414445

415446
return convertResult(searchResult, kw, pageSize)
@@ -423,7 +454,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
423454
Size(0). // We only needs stats information
424455
Do(context.Background())
425456
if err != nil {
426-
return 0, nil, nil, err
457+
return 0, nil, nil, b.checkError(err)
427458
}
428459

429460
query = query.Must(langQuery)
@@ -440,7 +471,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
440471
From(start).Size(pageSize).
441472
Do(context.Background())
442473
if err != nil {
443-
return 0, nil, nil, err
474+
return 0, nil, nil, b.checkError(err)
444475
}
445476

446477
total, hits, _, err := convertResult(searchResult, kw, pageSize)
@@ -449,4 +480,33 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
449480
}
450481

451482
// Close implements indexer
452-
func (b *ElasticSearchIndexer) Close() {}
483+
func (b *ElasticSearchIndexer) Close() {
484+
close(b.stopTimer)
485+
}
486+
487+
func (b *ElasticSearchIndexer) checkError(err error) error {
488+
var opErr *net.OpError
489+
if b.available && (elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
490+
b.available = false
491+
if b.availabilityCallback != nil {
492+
b.availabilityCallback(b.available)
493+
}
494+
}
495+
return err
496+
}
497+
498+
func (b *ElasticSearchIndexer) checkAvailability() {
499+
if b.available {
500+
return
501+
}
502+
503+
// Request cluster state to check if elastic is available again
504+
_, err := b.client.ClusterState().Do(context.Background())
505+
if err != nil {
506+
return
507+
}
508+
b.available = true
509+
if b.availabilityCallback != nil {
510+
b.availabilityCallback(b.available)
511+
}
512+
}

modules/indexer/code/indexer.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type SearchResultLanguages struct {
4242

4343
// Indexer defines an interface to index and search code contents
4444
type Indexer interface {
45+
Ping() bool
46+
SetAvailabilityChangeCallback(callback func(bool))
4547
Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error
4648
Delete(repoID int64) error
4749
Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
@@ -140,6 +142,7 @@ func Init() {
140142
return data
141143
}
142144

145+
unhandled := make([]queue.Data, 0, len(data))
143146
for _, datum := range data {
144147
indexerData, ok := datum.(*IndexerData)
145148
if !ok {
@@ -150,10 +153,14 @@ func Init() {
150153

151154
if err := index(ctx, indexer, indexerData.RepoID); err != nil {
152155
log.Error("index: %v", err)
153-
continue
156+
if indexer.Ping() {
157+
continue
158+
}
159+
// Add back to queue
160+
unhandled = append(unhandled, datum)
154161
}
155162
}
156-
return nil
163+
return unhandled
157164
}
158165

159166
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
@@ -212,6 +219,18 @@ func Init() {
212219

213220
indexer.set(rIndexer)
214221

222+
if queue, ok := indexerQueue.(queue.Pausable); ok {
223+
rIndexer.SetAvailabilityChangeCallback(func(available bool) {
224+
if !available {
225+
log.Info("Code index queue paused")
226+
queue.Pause()
227+
} else {
228+
log.Info("Code index queue resumed")
229+
queue.Resume()
230+
}
231+
})
232+
}
233+
215234
// Start processing the queue
216235
go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
217236

@@ -262,6 +281,17 @@ func UpdateRepoIndexer(repo *repo_model.Repository) {
262281
}
263282
}
264283

284+
// IsAvailable checks if issue indexer is available
285+
func IsAvailable() bool {
286+
idx, err := indexer.get()
287+
if err != nil {
288+
log.Error("IsAvailable(): unable to get indexer: %v", err)
289+
return false
290+
}
291+
292+
return idx.Ping()
293+
}
294+
265295
// populateRepoIndexer populate the repo indexer with pre-existing data. This
266296
// should only be run when the indexer is created for the first time.
267297
func populateRepoIndexer(ctx context.Context) {

modules/indexer/code/wrapped.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111

1212
repo_model "code.gitea.io/gitea/models/repo"
13+
"code.gitea.io/gitea/modules/log"
1314
)
1415

1516
var indexer = newWrappedIndexer()
@@ -56,6 +57,26 @@ func (w *wrappedIndexer) get() (Indexer, error) {
5657
return w.internal, nil
5758
}
5859

60+
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
61+
func (w *wrappedIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
62+
indexer, err := w.get()
63+
if err != nil {
64+
log.Error("Failed to get indexer: %v", err)
65+
return
66+
}
67+
indexer.SetAvailabilityChangeCallback(callback)
68+
}
69+
70+
// Ping checks if elastic is available
71+
func (w *wrappedIndexer) Ping() bool {
72+
indexer, err := w.get()
73+
if err != nil {
74+
log.Warn("Failed to get indexer: %v", err)
75+
return false
76+
}
77+
return indexer.Ping()
78+
}
79+
5980
func (w *wrappedIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error {
6081
indexer, err := w.get()
6182
if err != nil {

options/locale/locale_en-US.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ search = Search
268268
code = Code
269269
search.fuzzy = Fuzzy
270270
search.match = Match
271+
code_search_unavailable = Currently code search is not available. Please contact your site administrator.
271272
repo_no_results = No matching repositories found.
272273
user_no_results = No matching users found.
273274
org_no_results = No matching organizations found.
@@ -1708,6 +1709,8 @@ search.search_repo = Search repository
17081709
search.fuzzy = Fuzzy
17091710
search.match = Match
17101711
search.results = Search results for "%s" in <a href="%s">%s</a>
1712+
search.code_no_results = No source code matching your search term found.
1713+
search.code_search_unavailable = Currently code search is not available. Please contact your site administrator.
17111714

17121715
settings = Settings
17131716
settings.desc = Settings is where you can manage the settings for the repository

0 commit comments

Comments
 (0)