Skip to content

Commit a720018

Browse files
committed
Fix lock issue
1 parent 81ecbce commit a720018

File tree

3 files changed

+12
-16
lines changed

3 files changed

+12
-16
lines changed

components/supervisor/pkg/ports/ports.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ type managedPort struct {
127127
// Subscription is a Subscription to status updates
128128
type Subscription struct {
129129
updates chan []*api.PortsStatus
130-
Close func() error
130+
Close func(lock bool) error
131131
}
132132

133133
// Updates returns the updates channel
@@ -153,7 +153,7 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
153153
pm.mu.Unlock()
154154

155155
for _, s := range subs {
156-
_ = s.Close()
156+
_ = s.Close(true)
157157
}
158158
}()
159159
defer cancel()
@@ -174,10 +174,6 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
174174
select {
175175
case <-pm.forceUpdates:
176176
forceUpdate = true
177-
case sub := <-pm.closeSubscriptions:
178-
pm.mu.Lock()
179-
delete(pm.subscriptions, sub)
180-
pm.mu.Unlock()
181177
case exposed = <-exposedUpdates:
182178
if exposed == nil {
183179
if ctx.Err() == nil {
@@ -330,7 +326,7 @@ func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, serve
330326
case sub.updates <- status:
331327
case <-time.After(5 * time.Second):
332328
log.Error("ports subscription droped out")
333-
_ = sub.Close()
329+
_ = sub.Close(false)
334330
}
335331
}
336332
}
@@ -778,15 +774,15 @@ func (pm *Manager) Subscribe() (*Subscription, error) {
778774

779775
sub := &Subscription{updates: make(chan []*api.PortsStatus, 5)}
780776
var once sync.Once
781-
sub.Close = func() error {
777+
sub.Close = func(lock bool) error {
778+
if lock {
779+
pm.mu.Lock()
780+
defer pm.mu.Unlock()
781+
}
782782
once.Do(func() {
783783
close(sub.updates)
784784
})
785-
select {
786-
case pm.closeSubscriptions <- sub:
787-
default:
788-
log.Error("closeSubscriptions channel is full")
789-
}
785+
delete(pm.subscriptions, sub)
790786
return nil
791787
}
792788
pm.subscriptions[sub] = struct{}{}

components/supervisor/pkg/ports/ports_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ func TestPortsUpdateState(t *testing.T) {
684684
}
685685
go func() {
686686
defer wg.Done()
687-
defer sub.Close()
687+
defer sub.Close(true)
688688

689689
for up := range sub.Updates() {
690690
updts = append(updts, up)
@@ -878,7 +878,7 @@ func TestPortsConcurrentSubscribe(t *testing.T) {
878878
// update
879879
case <-sub.Updates():
880880
}
881-
sub.Close()
881+
sub.Close(true)
882882
}
883883
return nil
884884
})

components/supervisor/pkg/supervisor/services.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (s *statusService) PortsStatus(req *api.PortsStatusRequest, srv api.StatusS
243243
if err != nil {
244244
return status.Error(codes.Internal, err.Error())
245245
}
246-
defer sub.Close()
246+
defer sub.Close(true)
247247

248248
for {
249249
select {

0 commit comments

Comments
 (0)