|
9 | 9 | "encoding/json"
|
10 | 10 | "fmt"
|
11 | 11 | "sync"
|
12 |
| - "sync/atomic" |
13 | 12 | "time"
|
14 | 13 |
|
15 | 14 | "code.gitea.io/gitea/modules/log"
|
@@ -81,6 +80,8 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
|
81 | 80 |
|
82 | 81 | // IsEmpty checks if the queue is empty
|
83 | 82 | func (q *ByteFIFOQueue) IsEmpty() bool {
|
| 83 | + q.lock.Lock() |
| 84 | + defer q.lock.Unlock() |
84 | 85 | if !q.WorkerPool.IsEmpty() {
|
85 | 86 | return false
|
86 | 87 | }
|
@@ -119,32 +120,32 @@ func (q *ByteFIFOQueue) readToChan() {
|
119 | 120 | q.cancel()
|
120 | 121 | return
|
121 | 122 | default:
|
122 |
| - atomic.AddInt64(&q.numInQueue, 1) |
| 123 | + q.lock.Lock() |
123 | 124 | bs, err := q.byteFIFO.Pop()
|
124 | 125 | if err != nil {
|
| 126 | + q.lock.Unlock() |
125 | 127 | log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
|
126 |
| - atomic.AddInt64(&q.numInQueue, -1) |
127 | 128 | time.Sleep(time.Millisecond * 100)
|
128 | 129 | continue
|
129 | 130 | }
|
130 | 131 |
|
131 | 132 | if len(bs) == 0 {
|
132 |
| - atomic.AddInt64(&q.numInQueue, -1) |
| 133 | + q.lock.Unlock() |
133 | 134 | time.Sleep(time.Millisecond * 100)
|
134 | 135 | continue
|
135 | 136 | }
|
136 | 137 |
|
137 | 138 | data, err := unmarshalAs(bs, q.exemplar)
|
138 | 139 | if err != nil {
|
139 | 140 | log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
|
140 |
| - atomic.AddInt64(&q.numInQueue, -1) |
| 141 | + q.lock.Unlock() |
141 | 142 | time.Sleep(time.Millisecond * 100)
|
142 | 143 | continue
|
143 | 144 | }
|
144 | 145 |
|
145 | 146 | log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
|
146 | 147 | q.WorkerPool.Push(data)
|
147 |
| - atomic.AddInt64(&q.numInQueue, -1) |
| 148 | + q.lock.Unlock() |
148 | 149 | }
|
149 | 150 | }
|
150 | 151 | }
|
@@ -222,6 +223,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
|
222 | 223 | if err != nil {
|
223 | 224 | return false, err
|
224 | 225 | }
|
225 |
| - |
226 | 226 | return q.byteFIFO.(UniqueByteFIFO).Has(bs)
|
227 | 227 | }
|
0 commit comments