@@ -2,6 +2,8 @@ package internal
2
2
3
3
import (
4
4
"context"
5
+ "sync"
6
+ "time"
5
7
6
8
"github.com/sourcegraph/log"
7
9
"google.golang.org/grpc/codes"
@@ -26,6 +28,7 @@ func NewRepositoryServiceServer(server *Server, config *GRPCRepositoryServiceCon
26
28
hostname : server .hostname ,
27
29
svc : server ,
28
30
fs : server .fs ,
31
+ locker : server .locker ,
29
32
}
30
33
31
34
if config .ExhaustiveRequestLoggingEnabled {
@@ -46,6 +49,7 @@ type repositoryServiceServer struct {
46
49
hostname string
47
50
fs gitserverfs.FS
48
51
svc service
52
+ locker RepositoryLocker
49
53
50
54
proto.UnimplementedGitserverRepositoryServiceServer
51
55
}
@@ -78,20 +82,57 @@ func (s *repositoryServiceServer) DeleteRepository(ctx context.Context, req *pro
78
82
return & proto.DeleteRepositoryResponse {}, nil
79
83
}
80
84
81
- func (s * repositoryServiceServer ) FetchRepository (ctx context. Context , req * proto.FetchRepositoryRequest ) ( * proto.FetchRepositoryResponse , error ) {
85
+ func (s * repositoryServiceServer ) FetchRepository (req * proto.FetchRepositoryRequest , ss proto.GitserverRepositoryService_FetchRepositoryServer ) error {
82
86
if req .GetRepoName () == "" {
83
- return nil , status .New (codes .InvalidArgument , "repo_name must be specified" ).Err ()
87
+ return status .New (codes .InvalidArgument , "repo_name must be specified" ).Err ()
84
88
}
85
89
86
90
repoName := api .RepoName (req .GetRepoName ())
87
91
88
- lastFetched , lastChanged , err := s .svc .FetchRepository (ctx , repoName )
92
+ var wg sync.WaitGroup
93
+ wg .Add (1 )
94
+ done := make (chan struct {})
95
+
96
+ go func () {
97
+ defer wg .Done ()
98
+
99
+ for {
100
+ select {
101
+ case <- done :
102
+ return
103
+ case <- time .After (time .Second ):
104
+ }
105
+ status , locked := s .locker .Status (repoName )
106
+ if locked {
107
+ err := ss .Send (& proto.FetchRepositoryResponse {
108
+ Payload : & proto.FetchRepositoryResponse_Progress {
109
+ Progress : & proto.FetchRepositoryResponse_FetchProgress {
110
+ Output : []byte (status ),
111
+ },
112
+ },
113
+ })
114
+ if err != nil {
115
+ s .logger .Error ("failed to send progress event" , log .Error (err ), log .String ("repo" , string (repoName )))
116
+ }
117
+ }
118
+ }
119
+
120
+ }()
121
+
122
+ lastFetched , lastChanged , err := s .svc .FetchRepository (ss .Context (), repoName )
89
123
if err != nil {
90
- return nil , status .New (codes .Internal , errors .Wrap (err , "failed to fetch repository" ).Error ()).Err ()
124
+ return status .New (codes .Internal , errors .Wrap (err , "failed to fetch repository" ).Error ()).Err ()
91
125
}
92
126
93
- return & proto.FetchRepositoryResponse {
94
- LastFetched : timestamppb .New (lastFetched ),
95
- LastChanged : timestamppb .New (lastChanged ),
96
- }, nil
127
+ close (done )
128
+ wg .Wait ()
129
+
130
+ return ss .Send (& proto.FetchRepositoryResponse {
131
+ Payload : & proto.FetchRepositoryResponse_Done {
132
+ Done : & proto.FetchRepositoryResponse_FetchDone {
133
+ LastFetched : timestamppb .New (lastFetched ),
134
+ LastChanged : timestamppb .New (lastChanged ),
135
+ },
136
+ },
137
+ })
97
138
}
0 commit comments