File tree Expand file tree Collapse file tree 4 files changed +12
-12
lines changed Expand file tree Collapse file tree 4 files changed +12
-12
lines changed Original file line number Diff line number Diff line change @@ -195,9 +195,11 @@ loop:
195
195
}
196
196
}
197
197
198
- var errQueueEmpty = fmt .Errorf ("empty queue" )
199
- var errEmptyBytes = fmt .Errorf ("empty bytes" )
200
- var errUnmarshal = fmt .Errorf ("failed to unmarshal" )
198
+ var (
199
+ errQueueEmpty = fmt .Errorf ("empty queue" )
200
+ errEmptyBytes = fmt .Errorf ("empty bytes" )
201
+ errUnmarshal = fmt .Errorf ("failed to unmarshal" )
202
+ )
201
203
202
204
func (q * ByteFIFOQueue ) doPop () error {
203
205
q .lock .Lock ()
Original file line number Diff line number Diff line change @@ -251,8 +251,8 @@ func (q *PersistableChannelQueue) Shutdown() {
251
251
q .channelQueue .Wait ()
252
252
q .internal .(* LevelQueue ).Wait ()
253
253
// Redirect all remaining data in the chan to the internal channel
254
- close (q .channelQueue .dataChan )
255
254
log .Trace ("PersistableChannelQueue: %s Redirecting remaining data" , q .delayedStarter .name )
255
+ close (q .channelQueue .dataChan )
256
256
for data := range q .channelQueue .dataChan {
257
257
_ = q .internal .Push (data )
258
258
atomic .AddInt64 (& q .channelQueue .numInQueue , - 1 )
Original file line number Diff line number Diff line change @@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) {
188
188
for _ , callback := range callbacks {
189
189
callback ()
190
190
}
191
-
192
191
}
Original file line number Diff line number Diff line change @@ -238,13 +238,12 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
238
238
q .channelQueue .Wait ()
239
239
q .internal .(* LevelUniqueQueue ).Wait ()
240
240
// Redirect all remaining data in the chan to the internal channel
241
- go func () {
242
- log .Trace ("PersistableChannelUniqueQueue: %s Redirecting remaining data" , q .delayedStarter .name )
243
- for data := range q .channelQueue .dataChan {
244
- _ = q .internal .Push (data )
245
- }
246
- log .Trace ("PersistableChannelUniqueQueue: %s Done Redirecting remaining data" , q .delayedStarter .name )
247
- }()
241
+ close (q .channelQueue .dataChan )
242
+ log .Trace ("PersistableChannelUniqueQueue: %s Redirecting remaining data" , q .delayedStarter .name )
243
+ for data := range q .channelQueue .dataChan {
244
+ _ = q .internal .Push (data )
245
+ }
246
+ log .Trace ("PersistableChannelUniqueQueue: %s Done Redirecting remaining data" , q .delayedStarter .name )
248
247
249
248
log .Debug ("PersistableChannelUniqueQueue: %s Shutdown" , q .delayedStarter .name )
250
249
}
You can’t perform that action at this time.
0 commit comments