Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit 24a6cae

Browse files
authored
gitserver: Add progressWriter to VCS fetch (#61863)
Clone already did this, now Fetch does the same. This will allow to also get access to the progress while it's happening, not only afterwards. Test plan: Existing clone / update tests still pass.
1 parent a73ef24 commit 24a6cae

File tree

9 files changed

+105
-97
lines changed

9 files changed

+105
-97
lines changed

cmd/gitserver/internal/server.go

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ type Server struct {
184184
wg sync.WaitGroup // tracks running background jobs
185185

186186
// cloneLimiter limits the number of concurrent
187-
// clones. Use s.acquireCloneLimiter() and instead of using it directly.
187+
// clones.
188188
cloneLimiter *limiter.MutableLimiter
189189

190190
// rpsLimiter limits the remote code host git operations done per second
@@ -249,14 +249,6 @@ func (s *Server) getRemoteURL(ctx context.Context, name api.RepoName) (*vcs.URL,
249249
return vcs.ParseURL(remoteURL)
250250
}
251251

252-
// acquireCloneLimiter() acquires a cancellable context associated with the
253-
// clone limiter.
254-
func (s *Server) acquireCloneLimiter(ctx context.Context) (context.Context, context.CancelFunc, error) {
255-
pendingClones.Inc()
256-
defer pendingClones.Dec()
257-
return s.cloneLimiter.Acquire(ctx)
258-
}
259-
260252
func (s *Server) IsRepoCloneable(ctx context.Context, repo api.RepoName) (protocol.IsRepoCloneableResponse, error) {
261253
// We use an internal actor here as the repo may be private. It is safe since all
262254
// we return is a bool indicating whether the repo is cloneable or not. Perhaps
@@ -323,7 +315,9 @@ func (s *Server) repoUpdateOrClone(ctx context.Context, repoName api.RepoName) e
323315

324316
// Use caller context, if the caller is not interested anymore before we
325317
// start cloning, we can skip the clone altogether.
326-
_, cancelCloneLimiter, err := s.acquireCloneLimiter(ctx)
318+
pendingClones.Inc()
319+
_, cancelCloneLimiter, err := s.cloneLimiter.Acquire(ctx)
320+
pendingClones.Dec()
327321
if err != nil {
328322
lock.Release()
329323
return err
@@ -371,7 +365,7 @@ func (s *Server) repoUpdateOrClone(ctx context.Context, repoName api.RepoName) e
371365
repoClonedCounter.Inc()
372366
logger.Info("cloned repo", log.String("repo", string(repoName)))
373367
} else {
374-
if err := s.doRepoUpdate(ctx, repoName); err != nil {
368+
if err := s.doRepoUpdate(ctx, repoName, lock); err != nil {
375369
// The repo update might have failed due to the repo being corrupt
376370
s.LogIfCorrupt(ctx, repoName, err)
377371

@@ -502,7 +496,7 @@ func (s *Server) cloneRepo(ctx context.Context, repo api.RepoName, lock Reposito
502496
// produced, the ideal solution would be that readCloneProgress stores it in
503497
// chunks.
504498
output := &linebasedBufferedWriter{}
505-
eg := readCloneProgress(logger, lock, io.TeeReader(progressReader, output), repo)
499+
eg := readFetchProgress(logger, lock, io.TeeReader(progressReader, output), repo)
506500

507501
cloneTimeout := conf.GitLongCommandTimeout()
508502
cloneCtx, cancel := context.WithTimeout(ctx, cloneTimeout)
@@ -516,7 +510,7 @@ func (s *Server) cloneRepo(ctx context.Context, repo api.RepoName, lock Reposito
516510
}
517511

518512
// best-effort update the output of the clone
519-
if err := s.db.GitserverRepos().SetLastOutput(context.Background(), repo, output.String()); err != nil {
513+
if err := s.db.GitserverRepos().SetLastOutput(ctx, repo, output.String()); err != nil {
520514
s.logger.Error("Setting last output in DB", log.Error(err))
521515
}
522516

@@ -602,19 +596,19 @@ func (w *linebasedBufferedWriter) Bytes() []byte {
602596
return w.buf
603597
}
604598

605-
// readCloneProgress scans the reader and saves the most recent line of output
599+
// readFetchProgress scans the reader and saves the most recent line of output
606600
// as the lock status, and optionally writes to a log file if siteConfig.cloneProgressLog
607601
// is enabled.
608-
func readCloneProgress(logger log.Logger, lock RepositoryLock, pr io.Reader, repo api.RepoName) *errgroup.Group {
602+
func readFetchProgress(logger log.Logger, lock RepositoryLock, pr io.Reader, repo api.RepoName) *errgroup.Group {
609603
var logFile *os.File
610604

611605
if conf.Get().CloneProgressLog {
612606
var err error
613607
logFile, err = os.CreateTemp("", "")
614608
if err != nil {
615-
logger.Warn("failed to create temporary clone log file", log.Error(err), log.String("repo", string(repo)))
609+
logger.Warn("failed to create temporary fetch log file", log.Error(err), log.String("repo", string(repo)))
616610
} else {
617-
logger.Info("logging clone output", log.String("file", logFile.Name()), log.String("repo", string(repo)))
611+
logger.Info("logging fetch output", log.String("file", logFile.Name()), log.String("repo", string(repo)))
618612
defer logFile.Close()
619613
}
620614
}
@@ -691,7 +685,7 @@ var (
691685

692686
var doBackgroundRepoUpdateMock func(api.RepoName) error
693687

694-
func (s *Server) doRepoUpdate(ctx context.Context, repo api.RepoName) (err error) {
688+
func (s *Server) doRepoUpdate(ctx context.Context, repo api.RepoName, lock RepositoryLock) (err error) {
695689
logger := s.logger.Scoped("repoUpdate").With(log.String("repo", string(repo)))
696690

697691
if doBackgroundRepoUpdateMock != nil {
@@ -722,21 +716,35 @@ func (s *Server) doRepoUpdate(ctx context.Context, repo api.RepoName) (err error
722716
// ensure the background update doesn't hang forever
723717
fetchCtx, cancelTimeout := context.WithTimeout(ctx, fetchTimeout)
724718
defer cancelTimeout()
725-
output, err := syncer.Fetch(fetchCtx, repo, dir)
726-
// best-effort update the output of the fetch
727-
if err := s.db.GitserverRepos().SetLastOutput(ctx, repo, string(output)); err != nil {
728-
s.logger.Warn("Setting last output in DB", log.Error(err))
719+
720+
progressReader, progressWriter := io.Pipe()
721+
// We also capture the entire output in memory for the call to SetLastOutput
722+
// further down.
723+
// TODO: This might require a lot of memory depending on the amount of logs
724+
// produced, the ideal solution would be that readCloneProgress stores it in
725+
// chunks.
726+
output := &linebasedBufferedWriter{}
727+
eg := readFetchProgress(logger, lock, io.TeeReader(progressReader, output), repo)
728+
729+
fetchErr := syncer.Fetch(fetchCtx, repo, dir, progressWriter)
730+
progressWriter.Close()
731+
732+
if err := eg.Wait(); err != nil {
733+
s.logger.Error("reading fetch progress", log.Error(err))
729734
}
730735

731-
if err != nil {
736+
// best-effort store the output of the fetch
737+
if err := s.db.GitserverRepos().SetLastOutput(ctx, repo, output.String()); err != nil {
738+
s.logger.Error("Setting last output in DB", log.Error(err))
739+
}
740+
741+
if fetchErr != nil {
732742
if err := fetchCtx.Err(); err != nil {
733743
return err
734744
}
735-
if output != nil {
736-
return errors.Wrapf(err, "failed to fetch repo %q with output %q", repo, string(output))
737-
} else {
738-
return errors.Wrapf(err, "failed to fetch repo %q", repo)
739-
}
745+
// TODO: Should we really return the entire output here in an error?
746+
// It could be a super big error string.
747+
return errors.Wrapf(err, "failed to fetch repo %q with output %q", repo, output.String())
740748
}
741749

742750
return postRepoFetchActions(ctx, logger, s.fs, s.db, s.getBackendFunc(dir, repo), s.hostname, repo, dir, syncer)

cmd/gitserver/internal/server_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,7 +1141,7 @@ type mockVCSSyncer struct {
11411141
mockTypeFunc func() string
11421142
mockIsCloneable func(ctx context.Context, repoName api.RepoName) error
11431143
mockClone func(ctx context.Context, repo api.RepoName, targetDir common.GitDir, tmpPath string, progressWriter io.Writer) error
1144-
mockFetch func(ctx context.Context, repoName api.RepoName, dir common.GitDir) ([]byte, error)
1144+
mockFetch func(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) error
11451145
}
11461146

11471147
func (m *mockVCSSyncer) Type() string {
@@ -1168,12 +1168,12 @@ func (m *mockVCSSyncer) Clone(ctx context.Context, repo api.RepoName, targetDir
11681168
return errors.New("no mock for Clone() is set")
11691169
}
11701170

1171-
func (m *mockVCSSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir) ([]byte, error) {
1171+
func (m *mockVCSSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) error {
11721172
if m.mockFetch != nil {
1173-
return m.mockFetch(ctx, repoName, dir)
1173+
return m.mockFetch(ctx, repoName, dir, progressWriter)
11741174
}
11751175

1176-
return nil, errors.New("no mock for Fetch() is set")
1176+
return errors.New("no mock for Fetch() is set")
11771177
}
11781178

11791179
var _ vcssyncer.VCSSyncer = &mockVCSSyncer{}

cmd/gitserver/internal/vcssyncer/git.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package vcssyncer
22

33
import (
4-
"bytes"
54
"context"
65
"io"
76
"os"
@@ -162,24 +161,22 @@ func (s *gitRepoSyncer) Clone(ctx context.Context, repo api.RepoName, _ common.G
162161
}
163162

164163
// Fetch tries to fetch updates of a Git repository.
165-
func (s *gitRepoSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir) ([]byte, error) {
164+
func (s *gitRepoSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) error {
166165
source, err := s.getRemoteURLSource(ctx, repoName)
167166
if err != nil {
168-
return nil, errors.Wrapf(err, "failed to get remote URL source for %s", repoName)
167+
return errors.Wrapf(err, "failed to get remote URL source for %s", repoName)
169168
}
170169

171-
var output bytes.Buffer
172-
173170
// Fetch the remote contents.
174171
{
175-
tryWrite(s.logger, &output, "Fetching remote contents\n")
172+
tryWrite(s.logger, progressWriter, "Fetching remote contents\n")
176173

177-
exitCode, err := s.runFetchCommand(ctx, repoName, source, &output, dir)
174+
exitCode, err := s.runFetchCommand(ctx, repoName, source, progressWriter, dir)
178175
if err != nil {
179-
return nil, errors.Wrapf(err, "exit code: %d, failed to fetch from remote: %s", exitCode, output.String())
176+
return errors.Wrapf(err, "exit code: %d, failed to fetch from remote", exitCode)
180177
}
181178

182-
tryWrite(s.logger, &output, "Fetched remote contents\n")
179+
tryWrite(s.logger, progressWriter, "Fetched remote contents\n")
183180

184181
}
185182

@@ -189,17 +186,17 @@ func (s *gitRepoSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir co
189186

190187
// Set the local HEAD to the remote HEAD.
191188
{
192-
tryWrite(s.logger, &output, "Setting local HEAD to remote HEAD\n")
189+
tryWrite(s.logger, progressWriter, "Setting local HEAD to remote HEAD\n")
193190

194191
err := s.setHEAD(ctx, repoName, dir, source)
195192
if err != nil {
196-
return nil, errors.Wrap(err, "failed to set local HEAD to remote HEAD")
193+
return errors.Wrap(err, "failed to set local HEAD to remote HEAD")
197194
}
198195

199-
tryWrite(s.logger, &output, "Finished setting local HEAD to remote HEAD\n")
196+
tryWrite(s.logger, progressWriter, "Finished setting local HEAD to remote HEAD\n")
200197
}
201198

202-
return output.Bytes(), nil
199+
return nil
203200
}
204201

205202
func (s *gitRepoSyncer) runFetchCommand(ctx context.Context, repoName api.RepoName, source RemoteURLSource, progressWriter io.Writer, dir common.GitDir) (exitCode int, err error) {

cmd/gitserver/internal/vcssyncer/instrumented_syncer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ func (i *instrumentedSyncer) Clone(ctx context.Context, repo api.RepoName, targe
9696
return i.base.Clone(ctx, repo, targetDir, tmpPath, progressWriter)
9797
}
9898

99-
func (i *instrumentedSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir) (output []byte, err error) {
99+
func (i *instrumentedSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) (err error) {
100100
if !i.shouldObserve() {
101-
return i.base.Fetch(ctx, repoName, dir)
101+
return i.base.Fetch(ctx, repoName, dir, progressWriter)
102102
}
103103

104104
start := time.Now()
@@ -109,7 +109,7 @@ func (i *instrumentedSyncer) Fetch(ctx context.Context, repoName api.RepoName, d
109109
metricFetchDuration.WithLabelValues(i.formattedTypeLabel, strconv.FormatBool(succeeded)).Observe(duration)
110110
}()
111111

112-
return i.base.Fetch(ctx, repoName, dir)
112+
return i.base.Fetch(ctx, repoName, dir, progressWriter)
113113
}
114114

115115
func (i *instrumentedSyncer) shouldObserve() bool {

cmd/gitserver/internal/vcssyncer/mock.go

Lines changed: 23 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)