Skip to content

Commit 523efa4

Browse files
zeripath6543lafriks
authored
Move Bleve and Elastic code indexers to use a common cat-file --batch (#14781)
* Extract out the common cat-file batch calls Signed-off-by: Andrew Thornton <[email protected]> * Move bleve and elastic indexers to use a common cat-file --batch when indexing Signed-off-by: Andrew Thornton <[email protected]> * move catfilebatch to batch_reader and rename to batch_reader.go Signed-off-by: Andrew Thornton <[email protected]> Co-authored-by: 6543 <[email protected]> Co-authored-by: Lauris BH <[email protected]>
1 parent 0044e80 commit 523efa4

File tree

6 files changed

+91
-87
lines changed

6 files changed

+91
-87
lines changed

modules/git/batch_reader_nogogit.go renamed to modules/git/batch_reader.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,48 @@
22
// Use of this source code is governed by a MIT-style
33
// license that can be found in the LICENSE file.
44

5-
// +build !gogit
6-
75
package git
86

97
import (
108
"bufio"
119
"bytes"
10+
"io"
1211
"math"
1312
"strconv"
13+
"strings"
1414
)
1515

16+
// CatFileBatch opens git cat-file --batch in the provided repo and returns a stdin pipe, a stdout reader and cancel function
17+
func CatFileBatch(repoPath string) (*io.PipeWriter, *bufio.Reader, func()) {
18+
// Next feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary.
19+
// so let's create a batch stdin and stdout
20+
batchStdinReader, batchStdinWriter := io.Pipe()
21+
batchStdoutReader, batchStdoutWriter := io.Pipe()
22+
cancel := func() {
23+
_ = batchStdinReader.Close()
24+
_ = batchStdinWriter.Close()
25+
_ = batchStdoutReader.Close()
26+
_ = batchStdoutWriter.Close()
27+
}
28+
29+
go func() {
30+
stderr := strings.Builder{}
31+
err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(repoPath, batchStdoutWriter, &stderr, batchStdinReader)
32+
if err != nil {
33+
_ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String()))
34+
_ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String()))
35+
} else {
36+
_ = batchStdoutWriter.Close()
37+
_ = batchStdinReader.Close()
38+
}
39+
}()
40+
41+
// For simplicities sake we'll us a buffered reader to read from the cat-file --batch
42+
batchReader := bufio.NewReader(batchStdoutReader)
43+
44+
return batchStdinWriter, batchReader, cancel
45+
}
46+
1647
// ReadBatchLine reads the header line from cat-file --batch
1748
// We expect:
1849
// <sha> SP <type> SP <size> LF

modules/git/commit_info_nogogit.go

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -141,29 +141,8 @@ func GetLastCommitForPaths(commit *Commit, treePath string, paths []string) ([]*
141141
}
142142
}()
143143

144-
// We feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary.
145-
// so let's create a batch stdin and stdout
146-
batchStdinReader, batchStdinWriter := io.Pipe()
147-
batchStdoutReader, batchStdoutWriter := io.Pipe()
148-
defer func() {
149-
_ = batchStdinReader.Close()
150-
_ = batchStdinWriter.Close()
151-
_ = batchStdoutReader.Close()
152-
_ = batchStdoutWriter.Close()
153-
}()
154-
155-
go func() {
156-
stderr := strings.Builder{}
157-
err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(commit.repo.Path, batchStdoutWriter, &stderr, batchStdinReader)
158-
if err != nil {
159-
_ = revListWriter.CloseWithError(ConcatenateError(err, (&stderr).String()))
160-
} else {
161-
_ = revListWriter.Close()
162-
}
163-
}()
164-
165-
// For simplicities sake we'll us a buffered reader
166-
batchReader := bufio.NewReader(batchStdoutReader)
144+
batchStdinWriter, batchReader, cancel := CatFileBatch(commit.repo.Path)
145+
defer cancel()
167146

168147
mapsize := 4096
169148
if len(paths) > mapsize {

modules/git/pipeline/lfs_nogogit.go

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,27 +64,8 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) {
6464

6565
// Next feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary.
6666
// so let's create a batch stdin and stdout
67-
batchStdinReader, batchStdinWriter := io.Pipe()
68-
batchStdoutReader, batchStdoutWriter := io.Pipe()
69-
defer func() {
70-
_ = batchStdinReader.Close()
71-
_ = batchStdinWriter.Close()
72-
_ = batchStdoutReader.Close()
73-
_ = batchStdoutWriter.Close()
74-
}()
75-
76-
go func() {
77-
stderr := strings.Builder{}
78-
err := git.NewCommand("cat-file", "--batch").RunInDirFullPipeline(repo.Path, batchStdoutWriter, &stderr, batchStdinReader)
79-
if err != nil {
80-
_ = revListWriter.CloseWithError(git.ConcatenateError(err, (&stderr).String()))
81-
} else {
82-
_ = revListWriter.Close()
83-
}
84-
}()
85-
86-
// For simplicities sake we'll us a buffered reader to read from the cat-file --batch
87-
batchReader := bufio.NewReader(batchStdoutReader)
67+
batchStdinWriter, batchReader, cancel := git.CatFileBatch(repo.Path)
68+
defer cancel()
8869

8970
// We'll use a scanner for the revList because it's simpler than a bufio.Reader
9071
scan := bufio.NewScanner(revListReader)

modules/git/repo_language_stats_nogogit.go

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"bytes"
1212
"io"
1313
"math"
14-
"strings"
1514

1615
"code.gitea.io/gitea/modules/analyze"
1716

@@ -22,30 +21,8 @@ import (
2221
func (repo *Repository) GetLanguageStats(commitID string) (map[string]int64, error) {
2322
// We will feed the commit IDs in order into cat-file --batch, followed by blobs as necessary.
2423
// so let's create a batch stdin and stdout
25-
26-
batchStdinReader, batchStdinWriter := io.Pipe()
27-
batchStdoutReader, batchStdoutWriter := io.Pipe()
28-
defer func() {
29-
_ = batchStdinReader.Close()
30-
_ = batchStdinWriter.Close()
31-
_ = batchStdoutReader.Close()
32-
_ = batchStdoutWriter.Close()
33-
}()
34-
35-
go func() {
36-
stderr := strings.Builder{}
37-
err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(repo.Path, batchStdoutWriter, &stderr, batchStdinReader)
38-
if err != nil {
39-
_ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String()))
40-
_ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String()))
41-
} else {
42-
_ = batchStdoutWriter.Close()
43-
_ = batchStdinReader.Close()
44-
}
45-
}()
46-
47-
// For simplicities sake we'll us a buffered reader
48-
batchReader := bufio.NewReader(batchStdoutReader)
24+
batchStdinWriter, batchReader, cancel := CatFileBatch(repo.Path)
25+
defer cancel()
4926

5027
writeID := func(id string) error {
5128
_, err := batchStdinWriter.Write([]byte(id))

modules/indexer/code/bleve.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
package code
66

77
import (
8+
"bufio"
89
"fmt"
10+
"io"
11+
"io/ioutil"
912
"os"
1013
"strconv"
1114
"strings"
@@ -173,7 +176,7 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
173176
return indexer, created, err
174177
}
175178

176-
func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
179+
func (b *BleveIndexer) addUpdate(batchWriter *io.PipeWriter, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
177180
// Ignore vendored files in code search
178181
if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
179182
return nil
@@ -196,8 +199,16 @@ func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *mode
196199
return b.addDelete(update.Filename, repo, batch)
197200
}
198201

199-
fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
200-
RunInDirBytes(repo.RepoPath())
202+
if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil {
203+
return err
204+
}
205+
206+
_, _, size, err := git.ReadBatchLine(batchReader)
207+
if err != nil {
208+
return err
209+
}
210+
211+
fileContents, err := ioutil.ReadAll(io.LimitReader(batchReader, size))
201212
if err != nil {
202213
return err
203214
} else if !base.IsTextFile(fileContents) {
@@ -254,10 +265,17 @@ func (b *BleveIndexer) Close() {
254265
// Index indexes the data
255266
func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
256267
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
257-
for _, update := range changes.Updates {
258-
if err := b.addUpdate(sha, update, repo, batch); err != nil {
259-
return err
268+
if len(changes.Updates) > 0 {
269+
270+
batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
271+
defer cancel()
272+
273+
for _, update := range changes.Updates {
274+
if err := b.addUpdate(batchWriter, batchReader, sha, update, repo, batch); err != nil {
275+
return err
276+
}
260277
}
278+
cancel()
261279
}
262280
for _, filename := range changes.RemovedFilenames {
263281
if err := b.addDelete(filename, repo, batch); err != nil {

modules/indexer/code/elastic_search.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
package code
66

77
import (
8+
"bufio"
89
"context"
910
"fmt"
11+
"io"
12+
"io/ioutil"
1013
"strconv"
1114
"strings"
1215
"time"
@@ -172,7 +175,7 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
172175
return exists, nil
173176
}
174177

175-
func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) {
178+
func (b *ElasticSearchIndexer) addUpdate(batchWriter *io.PipeWriter, batchReader *bufio.Reader, sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) {
176179
// Ignore vendored files in code search
177180
if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
178181
return nil, nil
@@ -195,8 +198,16 @@ func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *mo
195198
return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil
196199
}
197200

198-
fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
199-
RunInDirBytes(repo.RepoPath())
201+
if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil {
202+
return nil, err
203+
}
204+
205+
_, _, size, err := git.ReadBatchLine(batchReader)
206+
if err != nil {
207+
return nil, err
208+
}
209+
210+
fileContents, err := ioutil.ReadAll(io.LimitReader(batchReader, size))
200211
if err != nil {
201212
return nil, err
202213
} else if !base.IsTextFile(fileContents) {
@@ -230,14 +241,21 @@ func (b *ElasticSearchIndexer) addDelete(filename string, repo *models.Repositor
230241
// Index will save the index data
231242
func (b *ElasticSearchIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
232243
reqs := make([]elastic.BulkableRequest, 0)
233-
for _, update := range changes.Updates {
234-
updateReqs, err := b.addUpdate(sha, update, repo)
235-
if err != nil {
236-
return err
237-
}
238-
if len(updateReqs) > 0 {
239-
reqs = append(reqs, updateReqs...)
244+
if len(changes.Updates) > 0 {
245+
246+
batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
247+
defer cancel()
248+
249+
for _, update := range changes.Updates {
250+
updateReqs, err := b.addUpdate(batchWriter, batchReader, sha, update, repo)
251+
if err != nil {
252+
return err
253+
}
254+
if len(updateReqs) > 0 {
255+
reqs = append(reqs, updateReqs...)
256+
}
240257
}
258+
cancel()
241259
}
242260

243261
for _, filename := range changes.RemovedFilenames {

0 commit comments

Comments
 (0)