Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

gitserver: Add progressWriter to VCS fetch #61863

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 36 additions & 28 deletions cmd/gitserver/internal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ type Server struct {
wg sync.WaitGroup // tracks running background jobs

// cloneLimiter limits the number of concurrent
// clones. Use s.acquireCloneLimiter() and instead of using it directly.
// clones.
cloneLimiter *limiter.MutableLimiter

// rpsLimiter limits the remote code host git operations done per second
Expand Down Expand Up @@ -249,14 +249,6 @@ func (s *Server) getRemoteURL(ctx context.Context, name api.RepoName) (*vcs.URL,
return vcs.ParseURL(remoteURL)
}

// acquireCloneLimiter() acquires a cancellable context associated with the
// clone limiter.
func (s *Server) acquireCloneLimiter(ctx context.Context) (context.Context, context.CancelFunc, error) {
pendingClones.Inc()
defer pendingClones.Dec()
return s.cloneLimiter.Acquire(ctx)
}

func (s *Server) IsRepoCloneable(ctx context.Context, repo api.RepoName) (protocol.IsRepoCloneableResponse, error) {
// We use an internal actor here as the repo may be private. It is safe since all
// we return is a bool indicating whether the repo is cloneable or not. Perhaps
Expand Down Expand Up @@ -323,7 +315,9 @@ func (s *Server) repoUpdateOrClone(ctx context.Context, repoName api.RepoName) e

// Use caller context, if the caller is not interested anymore before we
// start cloning, we can skip the clone altogether.
_, cancelCloneLimiter, err := s.acquireCloneLimiter(ctx)
pendingClones.Inc()
_, cancelCloneLimiter, err := s.cloneLimiter.Acquire(ctx)
pendingClones.Dec()
if err != nil {
lock.Release()
return err
Expand Down Expand Up @@ -371,7 +365,7 @@ func (s *Server) repoUpdateOrClone(ctx context.Context, repoName api.RepoName) e
repoClonedCounter.Inc()
logger.Info("cloned repo", log.String("repo", string(repoName)))
} else {
if err := s.doRepoUpdate(ctx, repoName); err != nil {
if err := s.doRepoUpdate(ctx, repoName, lock); err != nil {
// The repo update might have failed due to the repo being corrupt
s.LogIfCorrupt(ctx, repoName, err)

Expand Down Expand Up @@ -502,7 +496,7 @@ func (s *Server) cloneRepo(ctx context.Context, repo api.RepoName, lock Reposito
// produced, the ideal solution would be that readCloneProgress stores it in
// chunks.
output := &linebasedBufferedWriter{}
eg := readCloneProgress(logger, lock, io.TeeReader(progressReader, output), repo)
eg := readFetchProgress(logger, lock, io.TeeReader(progressReader, output), repo)

cloneTimeout := conf.GitLongCommandTimeout()
cloneCtx, cancel := context.WithTimeout(ctx, cloneTimeout)
Expand All @@ -516,7 +510,7 @@ func (s *Server) cloneRepo(ctx context.Context, repo api.RepoName, lock Reposito
}

// best-effort update the output of the clone
if err := s.db.GitserverRepos().SetLastOutput(context.Background(), repo, output.String()); err != nil {
if err := s.db.GitserverRepos().SetLastOutput(ctx, repo, output.String()); err != nil {
s.logger.Error("Setting last output in DB", log.Error(err))
}

Expand Down Expand Up @@ -602,19 +596,19 @@ func (w *linebasedBufferedWriter) Bytes() []byte {
return w.buf
}

// readCloneProgress scans the reader and saves the most recent line of output
// readFetchProgress scans the reader and saves the most recent line of output
// as the lock status, and optionally writes to a log file if siteConfig.cloneProgressLog
// is enabled.
func readCloneProgress(logger log.Logger, lock RepositoryLock, pr io.Reader, repo api.RepoName) *errgroup.Group {
func readFetchProgress(logger log.Logger, lock RepositoryLock, pr io.Reader, repo api.RepoName) *errgroup.Group {
var logFile *os.File

if conf.Get().CloneProgressLog {
var err error
logFile, err = os.CreateTemp("", "")
if err != nil {
logger.Warn("failed to create temporary clone log file", log.Error(err), log.String("repo", string(repo)))
logger.Warn("failed to create temporary fetch log file", log.Error(err), log.String("repo", string(repo)))
} else {
logger.Info("logging clone output", log.String("file", logFile.Name()), log.String("repo", string(repo)))
logger.Info("logging fetch output", log.String("file", logFile.Name()), log.String("repo", string(repo)))
defer logFile.Close()
}
}
Expand Down Expand Up @@ -691,7 +685,7 @@ var (

var doBackgroundRepoUpdateMock func(api.RepoName) error

func (s *Server) doRepoUpdate(ctx context.Context, repo api.RepoName) (err error) {
func (s *Server) doRepoUpdate(ctx context.Context, repo api.RepoName, lock RepositoryLock) (err error) {
logger := s.logger.Scoped("repoUpdate").With(log.String("repo", string(repo)))

if doBackgroundRepoUpdateMock != nil {
Expand Down Expand Up @@ -722,21 +716,35 @@ func (s *Server) doRepoUpdate(ctx context.Context, repo api.RepoName) (err error
// ensure the background update doesn't hang forever
fetchCtx, cancelTimeout := context.WithTimeout(ctx, fetchTimeout)
defer cancelTimeout()
output, err := syncer.Fetch(fetchCtx, repo, dir)
// best-effort update the output of the fetch
if err := s.db.GitserverRepos().SetLastOutput(ctx, repo, string(output)); err != nil {
s.logger.Warn("Setting last output in DB", log.Error(err))

progressReader, progressWriter := io.Pipe()
// We also capture the entire output in memory for the call to SetLastOutput
// further down.
// TODO: This might require a lot of memory depending on the amount of logs
// produced, the ideal solution would be that readCloneProgress stores it in
// chunks.
output := &linebasedBufferedWriter{}
eg := readFetchProgress(logger, lock, io.TeeReader(progressReader, output), repo)

fetchErr := syncer.Fetch(fetchCtx, repo, dir, progressWriter)
progressWriter.Close()

if err := eg.Wait(); err != nil {
s.logger.Error("reading fetch progress", log.Error(err))
}

if err != nil {
// best-effort store the output of the fetch
if err := s.db.GitserverRepos().SetLastOutput(ctx, repo, output.String()); err != nil {
s.logger.Error("Setting last output in DB", log.Error(err))
}

if fetchErr != nil {
if err := fetchCtx.Err(); err != nil {
return err
}
if output != nil {
return errors.Wrapf(err, "failed to fetch repo %q with output %q", repo, string(output))
} else {
return errors.Wrapf(err, "failed to fetch repo %q", repo)
}
// TODO: Should we really return the entire output here in an error?
// It could be a super big error string.
return errors.Wrapf(err, "failed to fetch repo %q with output %q", repo, output.String())
}

return postRepoFetchActions(ctx, logger, s.fs, s.db, s.getBackendFunc(dir, repo), s.hostname, repo, dir, syncer)
Expand Down
8 changes: 4 additions & 4 deletions cmd/gitserver/internal/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ type mockVCSSyncer struct {
mockTypeFunc func() string
mockIsCloneable func(ctx context.Context, repoName api.RepoName) error
mockClone func(ctx context.Context, repo api.RepoName, targetDir common.GitDir, tmpPath string, progressWriter io.Writer) error
mockFetch func(ctx context.Context, repoName api.RepoName, dir common.GitDir) ([]byte, error)
mockFetch func(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) error
}

func (m *mockVCSSyncer) Type() string {
Expand All @@ -1168,12 +1168,12 @@ func (m *mockVCSSyncer) Clone(ctx context.Context, repo api.RepoName, targetDir
return errors.New("no mock for Clone() is set")
}

func (m *mockVCSSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir) ([]byte, error) {
func (m *mockVCSSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) error {
if m.mockFetch != nil {
return m.mockFetch(ctx, repoName, dir)
return m.mockFetch(ctx, repoName, dir, progressWriter)
}

return nil, errors.New("no mock for Fetch() is set")
return errors.New("no mock for Fetch() is set")
}

var _ vcssyncer.VCSSyncer = &mockVCSSyncer{}
23 changes: 10 additions & 13 deletions cmd/gitserver/internal/vcssyncer/git.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package vcssyncer

import (
"bytes"
"context"
"io"
"os"
Expand Down Expand Up @@ -162,24 +161,22 @@ func (s *gitRepoSyncer) Clone(ctx context.Context, repo api.RepoName, _ common.G
}

// Fetch tries to fetch updates of a Git repository.
func (s *gitRepoSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir) ([]byte, error) {
func (s *gitRepoSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) error {
source, err := s.getRemoteURLSource(ctx, repoName)
if err != nil {
return nil, errors.Wrapf(err, "failed to get remote URL source for %s", repoName)
return errors.Wrapf(err, "failed to get remote URL source for %s", repoName)
}

var output bytes.Buffer

// Fetch the remote contents.
{
tryWrite(s.logger, &output, "Fetching remote contents\n")
tryWrite(s.logger, progressWriter, "Fetching remote contents\n")

exitCode, err := s.runFetchCommand(ctx, repoName, source, &output, dir)
exitCode, err := s.runFetchCommand(ctx, repoName, source, progressWriter, dir)
if err != nil {
return nil, errors.Wrapf(err, "exit code: %d, failed to fetch from remote: %s", exitCode, output.String())
return errors.Wrapf(err, "exit code: %d, failed to fetch from remote", exitCode)
}

tryWrite(s.logger, &output, "Fetched remote contents\n")
tryWrite(s.logger, progressWriter, "Fetched remote contents\n")

}

Expand All @@ -189,17 +186,17 @@ func (s *gitRepoSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir co

// Set the local HEAD to the remote HEAD.
{
tryWrite(s.logger, &output, "Setting local HEAD to remote HEAD\n")
tryWrite(s.logger, progressWriter, "Setting local HEAD to remote HEAD\n")

err := s.setHEAD(ctx, repoName, dir, source)
if err != nil {
return nil, errors.Wrap(err, "failed to set local HEAD to remote HEAD")
return errors.Wrap(err, "failed to set local HEAD to remote HEAD")
}

tryWrite(s.logger, &output, "Finished setting local HEAD to remote HEAD\n")
tryWrite(s.logger, progressWriter, "Finished setting local HEAD to remote HEAD\n")
}

return output.Bytes(), nil
return nil
}

func (s *gitRepoSyncer) runFetchCommand(ctx context.Context, repoName api.RepoName, source RemoteURLSource, progressWriter io.Writer, dir common.GitDir) (exitCode int, err error) {
Expand Down
6 changes: 3 additions & 3 deletions cmd/gitserver/internal/vcssyncer/instrumented_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func (i *instrumentedSyncer) Clone(ctx context.Context, repo api.RepoName, targe
return i.base.Clone(ctx, repo, targetDir, tmpPath, progressWriter)
}

func (i *instrumentedSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir) (output []byte, err error) {
func (i *instrumentedSyncer) Fetch(ctx context.Context, repoName api.RepoName, dir common.GitDir, progressWriter io.Writer) (err error) {
if !i.shouldObserve() {
return i.base.Fetch(ctx, repoName, dir)
return i.base.Fetch(ctx, repoName, dir, progressWriter)
}

start := time.Now()
Expand All @@ -109,7 +109,7 @@ func (i *instrumentedSyncer) Fetch(ctx context.Context, repoName api.RepoName, d
metricFetchDuration.WithLabelValues(i.formattedTypeLabel, strconv.FormatBool(succeeded)).Observe(duration)
}()

return i.base.Fetch(ctx, repoName, dir)
return i.base.Fetch(ctx, repoName, dir, progressWriter)
}

func (i *instrumentedSyncer) shouldObserve() bool {
Expand Down
46 changes: 23 additions & 23 deletions cmd/gitserver/internal/vcssyncer/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading