Skip to content

Attempt to prevent the deadlock in the QueueDiskChannel Test again #18415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 29, 2022
5 changes: 4 additions & 1 deletion modules/queue/queue_bytefifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ loop:
// tell the pool to shutdown.
q.baseCtxCancel()
return
case data := <-q.dataChan:
case data, ok := <-q.dataChan:
if !ok {
return
}
if err := q.PushBack(data); err != nil {
log.Error("Unable to push back data into queue %s", q.name)
}
Expand Down
5 changes: 4 additions & 1 deletion modules/queue/queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
select {
case <-paused:
return nil
case data := <-q.dataChan:
case data, ok := <-q.dataChan:
if !ok {
return nil
}
if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
}
Expand Down
69 changes: 48 additions & 21 deletions modules/queue/queue_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"code.gitea.io/gitea/modules/log"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -111,7 +112,6 @@ func TestChannelQueue_Pause(t *testing.T) {
if pausable, ok := queue.(Pausable); ok {
pausable.Pause()
}
pushBack = false
lock.Unlock()
return data
}
Expand All @@ -123,7 +123,9 @@ func TestChannelQueue_Pause(t *testing.T) {
}
return nil
}
nilFn := func(_ func()) {}

queueShutdown := []func(){}
queueTerminate := []func(){}

queue, err = NewChannelQueue(handle,
ChannelQueueConfiguration{
Expand All @@ -139,7 +141,34 @@ func TestChannelQueue_Pause(t *testing.T) {
}, &testData{})
assert.NoError(t, err)

go queue.Run(nilFn, nilFn)
go queue.Run(func(shutdown func()) {
lock.Lock()
defer lock.Unlock()
queueShutdown = append(queueShutdown, shutdown)
}, func(terminate func()) {
lock.Lock()
defer lock.Unlock()
queueTerminate = append(queueTerminate, terminate)
})

// Shutdown and Terminate in defer
defer func() {
lock.Lock()
callbacks := make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
lock.Lock()
log.Info("Finally terminating")
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
}()

test1 := testData{"A", 1}
test2 := testData{"B", 2}
Expand All @@ -155,14 +184,11 @@ func TestChannelQueue_Pause(t *testing.T) {

pausable.Pause()

paused, resumed := pausable.IsPausedIsResumed()
paused, _ := pausable.IsPausedIsResumed()

select {
case <-paused:
case <-resumed:
assert.Fail(t, "Queue should not be resumed")
return
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}
Expand All @@ -179,10 +205,11 @@ func TestChannelQueue_Pause(t *testing.T) {
assert.Nil(t, result2)

pausable.Resume()
_, resumed := pausable.IsPausedIsResumed()

select {
case <-resumed:
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
}

Expand All @@ -199,47 +226,47 @@ func TestChannelQueue_Pause(t *testing.T) {
pushBack = true
lock.Unlock()

paused, resumed = pausable.IsPausedIsResumed()
_, resumed = pausable.IsPausedIsResumed()

select {
case <-paused:
assert.Fail(t, "Queue should not be paused")
return
case <-resumed:
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not resumed")
return
}

queue.Push(&test1)
paused, _ = pausable.IsPausedIsResumed()

select {
case <-paused:
case <-handleChan:
assert.Fail(t, "handler chan should not contain test1")
return
case <-time.After(500 * time.Millisecond):
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "queue should be paused")
return
}

paused, resumed = pausable.IsPausedIsResumed()
lock.Lock()
pushBack = false
lock.Unlock()

paused, _ = pausable.IsPausedIsResumed()

select {
case <-paused:
case <-resumed:
assert.Fail(t, "Queue should not be resumed")
return
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}

pausable.Resume()
_, resumed = pausable.IsPausedIsResumed()

select {
case <-resumed:
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
}

Expand Down
15 changes: 7 additions & 8 deletions modules/queue/queue_disk_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,13 @@ func (q *PersistableChannelQueue) Shutdown() {
q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
}
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
close(q.channelQueue.dataChan)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
}
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)

log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
}
Expand Down
Loading