Skip to content

Commit 31e051e

Browse files
committed
Ensure full redirection occurs
Signed-off-by: Andrew Thornton <[email protected]>
1 parent 4bcd53e commit 31e051e

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

modules/queue/queue_disk_channel.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,14 +313,13 @@ func (q *PersistableChannelQueue) Shutdown() {
313313
q.channelQueue.Wait()
314314
q.internal.(*LevelQueue).Wait()
315315
// Redirect all remaining data in the chan to the internal channel
316-
go func() {
317-
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
318-
for data := range q.channelQueue.dataChan {
319-
_ = q.internal.Push(data)
320-
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
321-
}
322-
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
323-
}()
316+
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
317+
close(q.channelQueue.dataChan)
318+
for data := range q.channelQueue.dataChan {
319+
_ = q.internal.Push(data)
320+
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
321+
}
322+
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
324323

325324
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
326325
}

modules/queue/queue_disk_channel_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
497497
callback()
498498
}
499499
lock.Lock()
500+
log.Info("Finally terminating")
500501
callbacks = make([]func(), len(queueTerminate))
501502
copy(callbacks, queueTerminate)
502503
lock.Unlock()

0 commit comments

Comments
 (0)