Skip to content

Commit 888c148

Browse files
committed
More nicely handle a closed datachan
Signed-off-by: Andrew Thornton <[email protected]>
1 parent 31e051e commit 888c148

File tree

4 files changed

+18
-10
lines changed

4 files changed

+18
-10
lines changed

modules/queue/queue_bytefifo.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,10 @@ loop:
205205
// tell the pool to shutdown.
206206
q.baseCtxCancel()
207207
return
208-
case data := <-q.dataChan:
208+
case data, ok := <-q.dataChan:
209+
if !ok {
210+
return
211+
}
209212
if err := q.PushBack(data); err != nil {
210213
log.Error("Unable to push back data into queue %s", q.name)
211214
}

modules/queue/queue_channel.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
117117
select {
118118
case <-paused:
119119
return nil
120-
case data := <-q.dataChan:
120+
case data, ok := <-q.dataChan:
121+
if !ok {
122+
return nil
123+
}
121124
if unhandled := q.handle(data); unhandled != nil {
122125
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
123126
}

modules/queue/unique_queue_channel.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,10 @@ func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
178178
default:
179179
}
180180
select {
181-
case data := <-q.dataChan:
181+
case data, ok := <-q.dataChan:
182+
if !ok {
183+
return nil
184+
}
182185
if unhandled := q.handle(data); unhandled != nil {
183186
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
184187
}

modules/queue/unique_queue_disk_channel.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,12 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
282282
q.channelQueue.Wait()
283283
q.internal.(*LevelUniqueQueue).Wait()
284284
// Redirect all remaining data in the chan to the internal channel
285-
go func() {
286-
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
287-
for data := range q.channelQueue.dataChan {
288-
_ = q.internal.Push(data)
289-
}
290-
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
291-
}()
285+
close(q.channelQueue.dataChan)
286+
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
287+
for data := range q.channelQueue.dataChan {
288+
_ = q.internal.Push(data)
289+
}
290+
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
292291

293292
log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
294293
}

0 commit comments

Comments
 (0)