@@ -45,7 +45,8 @@ func NewManager(exposed ExposedPortsInterface, served ServedPortsObserver, confi
45
45
C : config ,
46
46
T : tunneled ,
47
47
48
- forceUpdates : make (chan struct {}, 1 ),
48
+ forceUpdates : make (chan struct {}, 1 ),
49
+ closeSubscriptions : make (chan * Subscription , maxSubscriptions ),
49
50
50
51
internal : internal ,
51
52
proxies : make (map [uint32 ]* localhostProxy ),
@@ -80,7 +81,8 @@ type Manager struct {
80
81
C ConfigInterace
81
82
T TunneledPortsInterface
82
83
83
- forceUpdates chan struct {}
84
+ forceUpdates chan struct {}
85
+ closeSubscriptions chan * Subscription
84
86
85
87
internal map [uint32 ]struct {}
86
88
proxies map [uint32 ]* localhostProxy
@@ -172,6 +174,10 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
172
174
select {
173
175
case <- pm .forceUpdates :
174
176
forceUpdate = true
177
+ case sub := <- pm .closeSubscriptions :
178
+ pm .mu .Lock ()
179
+ delete (pm .subscriptions , sub )
180
+ pm .mu .Unlock ()
175
181
case exposed = <- exposedUpdates :
176
182
if exposed == nil {
177
183
if ctx .Err () == nil {
@@ -772,14 +778,14 @@ func (pm *Manager) Subscribe() (*Subscription, error) {
772
778
sub := & Subscription {updates : make (chan []* api.PortsStatus , 5 )}
773
779
var once sync.Once
774
780
sub .Close = func () error {
775
- pm .mu .Lock ()
776
- defer pm .mu .Unlock ()
777
-
778
781
once .Do (func () {
779
782
close (sub .updates )
780
783
})
781
- delete (pm .subscriptions , sub )
782
-
784
+ select {
785
+ case pm .closeSubscriptions <- sub :
786
+ default :
787
+ log .Error ("closeSubscriptions channel is full" )
788
+ }
783
789
return nil
784
790
}
785
791
pm .subscriptions [sub ] = struct {}{}
0 commit comments