Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit 934e08f

Browse files
committed
gitserver: Add progressWriter to VCS fetch
Clone already did this, now Fetch does the same. This will allow to also get access to the progress while it's happening, not only afterwards. Test plan: Existing clone / update tests still pass.
1 parent a2b170d commit 934e08f

File tree

9 files changed

+105
-97
lines changed

9 files changed

+105
-97
lines changed

cmd/gitserver/internal/server.go

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ type Server struct {
184184
wg sync.WaitGroup // tracks running background jobs
185185

186186
// cloneLimiter limits the number of concurrent
187-
// clones. Use s.acquireCloneLimiter() and instead of using it directly.
187+
// clones.
188188
cloneLimiter *limiter.MutableLimiter
189189

190190
// rpsLimiter limits the remote code host git operations done per second
@@ -249,14 +249,6 @@ func (s *Server) getRemoteURL(ctx context.Context, name api.RepoName) (*vcs.URL,
249249
return vcs.ParseURL(remoteURL)
250250
}
251251

252-
// acquireCloneLimiter() acquires a cancellable context associated with the
253-
// clone limiter.
254-
func (s *Server) acquireCloneLimiter(ctx context.Context) (context.Context, context.CancelFunc, error) {
255-
pendingClones.Inc()
256-
defer pendingClones.Dec()
257-
return s.cloneLimiter.Acquire(ctx)
258-
}
259-
260252
func (s *Server) IsRepoCloneable(ctx context.Context, repo api.RepoName) (protocol.IsRepoCloneableResponse, error) {
261253
// We use an internal actor here as the repo may be private. It is safe since all
262254
// we return is a bool indicating whether the repo is cloneable or not. Perhaps
@@ -323,7 +315,9 @@ func (s *Server) repoUpdateOrClone(ctx context.Context, repoName api.RepoName) e
323315

324316
// Use caller context, if the caller is not interested anymore before we
325317
// start cloning, we can skip the clone altogether.
326-
_, cancelCloneLimiter, err := s.acquireCloneLimiter(ctx)
318+
pendingClones.Inc()
319+
_, cancelCloneLimiter, err := s.cloneLimiter.Acquire(ctx)
320+
pendingClones.Dec()
327321
if err != nil {
328322
lock.Release()
329323
return err
@@ -371,7 +365,7 @@ func (s *Server) repoUpdateOrClone(ctx context.Context, repoName api.RepoName) e
371365
repoClonedCounter.Inc()
372366
logger.Info("cloned repo", log.String("repo", string(repoName)))
373367
} else {
374-
if err := s.doRepoUpdate(ctx, repoName); err != nil {
368+
if err := s.doRepoUpdate(ctx, repoName, lock); err != nil {
375369
// The repo update might have failed due to the repo being corrupt
376370
s.LogIfCorrupt(ctx, repoName, err)
377371

@@ -502,7 +496,7 @@ func (s *Server) cloneRepo(ctx context.Context, repo api.RepoName, lock Reposito
502496
// produced, the ideal solution would be that readCloneProgress stores it in
503497
// chunks.
504498
output := &linebasedBufferedWriter{}
505-
eg := readCloneProgress(logger, lock, io.TeeReader(progressReader, output), repo)
499+
eg := readFetchProgress(logger, lock, io.TeeReader(progressReader, output), repo)
506500

507501
cloneTimeout := conf.GitLongCommandTimeout()
508502
cloneCtx, cancel := context.WithTimeout(ctx, cloneTimeout)
@@ -516,7 +510,7 @@ func (s *Server) cloneRepo(ctx context.Context, repo api.RepoName, lock Reposito
516510
}
517511

518512
// best-effort update the output of the clone
519-
if err := s.db.GitserverRepos().SetLastOutput(context.Background(), repo, output.String()); err != nil {
513+
if err := s.db.GitserverRepos().SetLastOutput(ctx, repo, output.String()); err != nil {
520514
s.logger.Error("Setting last output in DB", log.Error(err))
521515
}
522516

@@ -602,19 +596,19 @@ func (w *linebasedBufferedWriter) Bytes() []byte {
602596
return w.buf
603597
}
604598

605-
// readCloneProgress scans the reader and saves the most recent line of output
599+
// readFetchProgress scans the reader and saves the most recent line of output
606600
// as the lock status, and optionally writes to a log file if siteConfig.cloneProgressLog
607601
// is enabled.
608-
func readCloneProgress(logger log.Logger, lock RepositoryLock, pr io.Reader, repo api.RepoName) *errgroup.Group {
602+
func readFetchProgress(logger log.Logger, lock RepositoryLock, pr io.Reader, repo api.RepoName) *errgroup.Group {
609603
var logFile *os.File
610604

611605
if conf.Get().CloneProgressLog {
612606
var err error
613607
logFile, err = os.CreateTemp("", "")
614608
if err != nil {
615-
logger.Warn("failed to create temporary clone log file", log.Error(err), log.String("repo", string(repo)))
609+
logger.Warn("failed to create temporary fetch log file", log.Error(err), log.String("repo", string(repo)))
616610
} else {
617-
logger.Info("logging clone output", log.String("file", logFile.Name()), log.String("repo", string(repo)))
611+
logger.Info("logging fetch output", log.String("file", logFile.Name()), log.String("repo", string(repo)))
618612
defer logFile.Close()
619613
}
620614
}
@@ -691,7 +685,7 @@ var (
691685

692686
var doBackgroundRepoUpdateMock func(api.RepoName) error
693687

694-
func (s *Server) doRepoUpdate(ctx context.Context, repo api.RepoName) (err error) {
688+
func (s *Server) doRepoUpdate(ctx context.Context, repo api.RepoName, lock RepositoryLock) (err error) {
695689
logger := s.logger.Scoped("repoUpdate").With(log.String("repo", string(repo)))
696690

697691
if doBackgroundRepoUpdateMock != nil {
@@ -722,21 +716,35 @@ func (s *Server) doRepoUpdate(ctx context.Context, repo api.RepoName) (err error
722716
// ensure the background update doesn't hang forever
723717
fetchCtx, cancelTimeout := context.WithTimeout(ctx, fetchTimeout)
724718
defer cancelTimeout()
725-
output, err := syncer.Fetch(fetchCtx, repo, dir)
726-
// best-effort update the output of the fetch
727-
if err := s.db.GitserverRepos().SetLastOutput(ctx, repo, string(output)); err != nil {
728-
s.logger.Warn("Setting last output in DB", log.Error(err))
719+
720+
progressReader, progressWriter := io.Pipe()
721+
// We also capture the entire output in memory for the call to SetLastOutput
722+
// further down.
723+
// TODO: This might require a lot of memory depending on the amount of logs
724+
// produced, the ideal solution would be that readCloneProgress stores it in
725+
// chunks.
726+
output := &linebasedBufferedWriter{}
727+
eg := readFetchProgress(logger, lock, io.TeeReader(progressReader, output), repo)
728+
729+
fetchErr := syncer.Fetch(fetchCtx, repo, dir, progressWriter)
730+
progressWriter.Close()
731+
732+
if err := eg.Wait(); err != nil {
733+
s.logger.Error("reading fetch progress", log.Error(err))
729734
}
730735

731-
if err != nil {
736+
// best-effort store the output of the fetch
737+
if err := s.db.GitserverRepos().SetLastOutput(ctx, repo, output.String()); err != nil {
738+
s.logger.Error("Setting last output in DB", log.Error(err))
739+
}
740+
741+
if fetchErr != nil {
732742
if err := fetchCtx.Err(); err != nil {
733743
return err
734744
}
735-
if output != nil {
736-
return errors.Wrapf(err, "failed to fetch repo %q with output %q", repo, string(output))
737-
} else {
738-
return errors.Wrapf(err, "failed to fetch repo %q", repo)
739-
}
745+
// TODO: Should we really return the entire output here in an error?
746+
// It could be a super big error string.
747+
return errors.Wrapf(err, "failed to fetch repo %q with output %q", repo, output.String())
740748
}
741749

742750
return postRepoFetchActions(ctx, logger, s.fs, s.db, s.getBackendFunc(dir, repo), s.hostname, repo, dir, syncer)

cmd/gitserver/internal/server_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,7 +1141,7 @@ type mockVCSSyncer struct {
11411141
mockTypeFunc func() string
11421142
mockIsCloneable func(ctx context.Context, repoName api.RepoName) error
11431143
mockClone func(ctx context.Context, repo api.RepoName, targetDir common.GitDir, tmpPath string, progressWriter io.Writer) error
1144-
mockFetch func(ctx context.Context, repoName api.RepoName, dir common.GitDir) ([]byte, error)
1144+
mockFetch func(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) error
11451145
}
11461146

11471147
func (m *mockVCSSyncer) Type() string {
@@ -1168,12 +1168,12 @@ func (m *mockVCSSyncer) Clone(ctx context.Context, repo api.RepoName, targetDir
11681168
return errors.New("no mock for Clone() is set")
11691169
}
11701170

1171-
func (m *mockVCSSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir) ([]byte, error) {
1171+
func (m *mockVCSSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) error {
11721172
if m.mockFetch != nil {
1173-
return m.mockFetch(ctx, repoName, dir)
1173+
return m.mockFetch(ctx, repoName, dir, progressWriter)
11741174
}
11751175

1176-
return nil, errors.New("no mock for Fetch() is set")
1176+
return errors.New("no mock for Fetch() is set")
11771177
}
11781178

11791179
var _ vcssyncer.VCSSyncer = &mockVCSSyncer{}

cmd/gitserver/internal/vcssyncer/git.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package vcssyncer
22

33
import (
4-
"bytes"
54
"context"
65
"io"
76
"os"
@@ -162,24 +161,22 @@ func (s *gitRepoSyncer) Clone(ctx context.Context, repo api.RepoName, _ common.G
162161
}
163162

164163
// Fetch tries to fetch updates of a Git repository.
165-
func (s *gitRepoSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir) ([]byte, error) {
164+
func (s *gitRepoSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) error {
166165
source, err := s.getRemoteURLSource(ctx, repoName)
167166
if err != nil {
168-
return nil, errors.Wrapf(err, "failed to get remote URL source for %s", repoName)
167+
return errors.Wrapf(err, "failed to get remote URL source for %s", repoName)
169168
}
170169

171-
var output bytes.Buffer
172-
173170
// Fetch the remote contents.
174171
{
175-
tryWrite(s.logger, &output, "Fetching remote contents\n")
172+
tryWrite(s.logger, progressWriter, "Fetching remote contents\n")
176173

177-
exitCode, err := s.runFetchCommand(ctx, repoName, source, &output, dir)
174+
exitCode, err := s.runFetchCommand(ctx, repoName, source, progressWriter, dir)
178175
if err != nil {
179-
return nil, errors.Wrapf(err, "exit code: %d, failed to fetch from remote: %s", exitCode, output.String())
176+
return errors.Wrapf(err, "exit code: %d, failed to fetch from remote", exitCode)
180177
}
181178

182-
tryWrite(s.logger, &output, "Fetched remote contents\n")
179+
tryWrite(s.logger, progressWriter, "Fetched remote contents\n")
183180

184181
}
185182

@@ -189,17 +186,17 @@ func (s *gitRepoSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir co
189186

190187
// Set the local HEAD to the remote HEAD.
191188
{
192-
tryWrite(s.logger, &output, "Setting local HEAD to remote HEAD\n")
189+
tryWrite(s.logger, progressWriter, "Setting local HEAD to remote HEAD\n")
193190

194191
err := s.setHEAD(ctx, repoName, dir, source)
195192
if err != nil {
196-
return nil, errors.Wrap(err, "failed to set local HEAD to remote HEAD")
193+
return errors.Wrap(err, "failed to set local HEAD to remote HEAD")
197194
}
198195

199-
tryWrite(s.logger, &output, "Finished setting local HEAD to remote HEAD\n")
196+
tryWrite(s.logger, progressWriter, "Finished setting local HEAD to remote HEAD\n")
200197
}
201198

202-
return output.Bytes(), nil
199+
return nil
203200
}
204201

205202
func (s *gitRepoSyncer) runFetchCommand(ctx context.Context, repoName api.RepoName, source RemoteURLSource, progressWriter io.Writer, dir common.GitDir) (exitCode int, err error) {

cmd/gitserver/internal/vcssyncer/instrumented_syncer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ func (i *instrumentedSyncer) Clone(ctx context.Context, repo api.RepoName, targe
9696
return i.base.Clone(ctx, repo, targetDir, tmpPath, progressWriter)
9797
}
9898

99-
func (i *instrumentedSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir) (output []byte, err error) {
99+
func (i *instrumentedSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) (err error) {
100100
if !i.shouldObserve() {
101-
return i.base.Fetch(ctx, repoName, dir)
101+
return i.base.Fetch(ctx, repoName, dir, progressWriter)
102102
}
103103

104104
start := time.Now()
@@ -109,7 +109,7 @@ func (i *instrumentedSyncer) Fetch(ctx context.Context, repoName api.RepoName, d
109109
metricFetchDuration.WithLabelValues(i.formattedTypeLabel, strconv.FormatBool(succeeded)).Observe(duration)
110110
}()
111111

112-
return i.base.Fetch(ctx, repoName, dir)
112+
return i.base.Fetch(ctx, repoName, dir, progressWriter)
113113
}
114114

115115
func (i *instrumentedSyncer) shouldObserve() bool {

cmd/gitserver/internal/vcssyncer/mock.go

Lines changed: 23 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)