Skip to content

Make indexer code more reusable #2590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions models/issue_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func InitIssueIndexer() {

// populateIssueIndexer populate the issue indexer with issue data
func populateIssueIndexer() error {
batch := indexer.IssueIndexerBatch()
for page := 1; ; page++ {
repos, _, err := Repositories(&SearchRepoOptions{
Page: page,
Expand All @@ -34,37 +35,45 @@ func populateIssueIndexer() error {
return fmt.Errorf("Repositories: %v", err)
}
if len(repos) == 0 {
return nil
return batch.Flush()
}
for _, repo := range repos {
issues, err := Issues(&IssuesOptions{
RepoID: repo.ID,
IsClosed: util.OptionalBoolNone,
IsPull: util.OptionalBoolNone,
})
updates := make([]indexer.IssueIndexerUpdate, len(issues))
for i, issue := range issues {
updates[i] = issue.update()
if err != nil {
return err
}
if err = indexer.BatchUpdateIssues(updates...); err != nil {
return fmt.Errorf("BatchUpdate: %v", err)
for _, issue := range issues {
if err := batch.Add(issue.update()); err != nil {
return err
}
}
}
}
}

func processIssueIndexerUpdateQueue() {
batch := indexer.IssueIndexerBatch()
for {
var issueID int64
select {
case issueID := <-issueIndexerUpdateQueue:
issue, err := GetIssueByID(issueID)
if err != nil {
log.Error(4, "issuesIndexer.Index: %v", err)
continue
}
if err = indexer.UpdateIssue(issue.update()); err != nil {
log.Error(4, "issuesIndexer.Index: %v", err)
case issueID = <-issueIndexerUpdateQueue:
default:
// flush whatever updates we currently have, since we
// might have to wait a while
if err := batch.Flush(); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
issueID = <-issueIndexerUpdateQueue
}
issue, err := GetIssueByID(issueID)
if err != nil {
log.Error(4, "GetIssueByID: %v", err)
} else if err = batch.Add(issue.update()); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
}
}
Expand Down
49 changes: 49 additions & 0 deletions modules/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strconv"

"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/mapping"
"github.com/blevesearch/bleve/search/query"
)

Expand Down Expand Up @@ -41,3 +43,50 @@ func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhrase
q.Analyzer = analyzer
return q
}

const unicodeNormalizeName = "unicodeNormalize"

func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{
"type": unicodenorm.Name,
"form": unicodenorm.NFC,
})
}

// Update represents an update to an indexer
type Update interface {
addToBatch(batch *bleve.Batch) error
}

const maxBatchSize = 16

// Batch batch of indexer updates that automatically flushes once it
// reaches a certain size
type Batch struct {
batch *bleve.Batch
index bleve.Index
}

// Add add update to batch, possibly flushing
func (batch *Batch) Add(update Update) error {
if err := update.addToBatch(batch.batch); err != nil {
return err
}
return batch.flushIfFull()
}

func (batch *Batch) flushIfFull() error {
if batch.batch.Size() >= maxBatchSize {
return batch.Flush()
}
return nil
}

// Flush manually flush the batch, regardless of its size
func (batch *Batch) Flush() error {
if err := batch.index.Batch(batch.batch); err != nil {
return err
}
batch.batch.Reset()
return nil
}
32 changes: 11 additions & 21 deletions modules/indexer/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/lowercase"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/index/upsidedown"
)
Expand All @@ -35,6 +34,10 @@ type IssueIndexerUpdate struct {
Data *IssueIndexerData
}

func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error {
return batch.Index(indexerID(update.IssueID), update.Data)
}

const issueIndexerAnalyzer = "issueIndexer"

// InitIssueIndexer initialize issue indexer
Expand Down Expand Up @@ -74,17 +77,13 @@ func createIssueIndexer() error {
docMapping.AddFieldMappingsAt("Content", textFieldMapping)
docMapping.AddFieldMappingsAt("Comments", textFieldMapping)

const unicodeNormNFC = "unicodeNormNFC"
if err := mapping.AddCustomTokenFilter(unicodeNormNFC, map[string]interface{}{
"type": unicodenorm.Name,
"form": unicodenorm.NFC,
}); err != nil {
if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
return err
} else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{
"type": custom.Name,
"char_filters": []string{},
"tokenizer": unicode.Name,
"token_filters": []string{unicodeNormNFC, lowercase.Name},
"token_filters": []string{unicodeNormalizeName, lowercase.Name},
}); err != nil {
return err
}
Expand All @@ -97,21 +96,12 @@ func createIssueIndexer() error {
return err
}

// UpdateIssue update the issue indexer
func UpdateIssue(update IssueIndexerUpdate) error {
return issueIndexer.Index(indexerID(update.IssueID), update.Data)
}

// BatchUpdateIssues perform a batch update of the issue indexer
func BatchUpdateIssues(updates ...IssueIndexerUpdate) error {
batch := issueIndexer.NewBatch()
for _, update := range updates {
err := batch.Index(indexerID(update.IssueID), update.Data)
if err != nil {
return err
}
// IssueIndexerBatch batch to add updates to
func IssueIndexerBatch() *Batch {
return &Batch{
batch: issueIndexer.NewBatch(),
index: issueIndexer,
}
return issueIndexer.Batch(batch)
}

// SearchIssuesByKeyword searches for issues by given conditions.
Expand Down