-
-
Notifications
You must be signed in to change notification settings - Fork 5.9k
Add Unique Queue infrastructure and move TestPullRequests to this #9856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
8eb4ce3
Upgrade levelqueue to version 0.2.0
zeripath ca14f21
Add UniqueQueue interface and functions to create them
zeripath 22c4563
Add UniqueQueue implementations
zeripath 92c2207
Move TestPullRequests over to use UniqueQueue
zeripath e6bd896
fix revive
zeripath 7b967e0
Reduce code duplication
zeripath 4b278ce
Merge remote-tracking branch 'origin/master' into unique-queues
zeripath 486a8e0
Merge remote-tracking branch 'origin/master' into unique-queues
zeripath 307f18a
Add bytefifos
zeripath 028872e
fix locking in persistablechanneluniquequeue shutdown
zeripath 8f6f779
rename queue pr_patch_checker
zeripath 2d88ad7
Ensure invalid types are logged
zeripath 23e7017
Move body of ByteFIFOQueue terminate out of select
zeripath a2fb188
Fix close race in PersistableChannelQueue Shutdown
zeripath f53c675
Fix double lock in unique wrapped
zeripath c54e2e4
Simplify PushFunc in Unique wrapped slightly
zeripath d82abd9
rename q fifo in queue_disk and handle not found err in unique_queue_…
zeripath 47d94a1
Lock for Empty and readToChan
zeripath 343cded
Merge branch 'master' into unique-queues
zeripath 5ab601b
Merge branch 'master' into unique-queues
zeripath 25c1427
Add some more comments
zeripath 111a87e
Merge branch 'master' into unique-queues
zeripath 6b368d6
Merge branch 'master' into unique-queues
zeripath File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
// Copyright 2020 The Gitea Authors. All rights reserved. | ||
// Use of this source code is governed by a MIT-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package queue | ||
|
||
// ByteFIFO defines a FIFO that takes a byte array | ||
type ByteFIFO interface { | ||
// Len returns the length of the fifo | ||
Len() int64 | ||
// PushFunc pushes data to the end of the fifo and calls the callback if it is added | ||
PushFunc(data []byte, fn func() error) error | ||
// Pop pops data from the start of the fifo | ||
Pop() ([]byte, error) | ||
// Close this fifo | ||
Close() error | ||
} | ||
|
||
// UniqueByteFIFO defines a FIFO that Uniques its contents | ||
type UniqueByteFIFO interface { | ||
ByteFIFO | ||
// Has returns whether the fifo contains this data | ||
Has(data []byte) (bool, error) | ||
} | ||
|
||
var _ (ByteFIFO) = &DummyByteFIFO{} | ||
|
||
// DummyByteFIFO represents a dummy fifo | ||
type DummyByteFIFO struct{} | ||
|
||
// PushFunc returns nil | ||
func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error { | ||
return nil | ||
} | ||
|
||
// Pop returns nil | ||
func (*DummyByteFIFO) Pop() ([]byte, error) { | ||
return []byte{}, nil | ||
} | ||
|
||
// Close returns nil | ||
func (*DummyByteFIFO) Close() error { | ||
return nil | ||
} | ||
|
||
// Len is always 0 | ||
func (*DummyByteFIFO) Len() int64 { | ||
return 0 | ||
} | ||
|
||
var _ (UniqueByteFIFO) = &DummyUniqueByteFIFO{} | ||
|
||
// DummyUniqueByteFIFO represents a dummy unique fifo | ||
type DummyUniqueByteFIFO struct { | ||
DummyByteFIFO | ||
} | ||
|
||
// Has always returns false | ||
func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) { | ||
return false, nil | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
// Copyright 2020 The Gitea Authors. All rights reserved. | ||
// Use of this source code is governed by a MIT-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package queue | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"code.gitea.io/gitea/modules/log" | ||
) | ||
|
||
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue | ||
type ByteFIFOQueueConfiguration struct { | ||
WorkerPoolConfiguration | ||
Workers int | ||
Name string | ||
} | ||
|
||
var _ (Queue) = &ByteFIFOQueue{} | ||
|
||
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool | ||
type ByteFIFOQueue struct { | ||
*WorkerPool | ||
byteFIFO ByteFIFO | ||
typ Type | ||
closed chan struct{} | ||
terminated chan struct{} | ||
exemplar interface{} | ||
workers int | ||
name string | ||
lock sync.Mutex | ||
} | ||
|
||
// NewByteFIFOQueue creates a new ByteFIFOQueue | ||
func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) { | ||
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
config := configInterface.(ByteFIFOQueueConfiguration) | ||
|
||
return &ByteFIFOQueue{ | ||
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), | ||
byteFIFO: byteFIFO, | ||
typ: typ, | ||
closed: make(chan struct{}), | ||
terminated: make(chan struct{}), | ||
exemplar: exemplar, | ||
workers: config.Workers, | ||
name: config.Name, | ||
}, nil | ||
} | ||
|
||
// Name returns the name of this queue | ||
func (q *ByteFIFOQueue) Name() string { | ||
return q.name | ||
} | ||
|
||
// Push pushes data to the fifo | ||
func (q *ByteFIFOQueue) Push(data Data) error { | ||
return q.PushFunc(data, nil) | ||
} | ||
|
||
// PushFunc pushes data to the fifo | ||
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { | ||
if !assignableTo(data, q.exemplar) { | ||
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) | ||
} | ||
bs, err := json.Marshal(data) | ||
if err != nil { | ||
return err | ||
} | ||
return q.byteFIFO.PushFunc(bs, fn) | ||
} | ||
|
||
// IsEmpty checks if the queue is empty | ||
func (q *ByteFIFOQueue) IsEmpty() bool { | ||
q.lock.Lock() | ||
defer q.lock.Unlock() | ||
if !q.WorkerPool.IsEmpty() { | ||
return false | ||
} | ||
return q.byteFIFO.Len() == 0 | ||
} | ||
|
||
// Run runs the bytefifo queue | ||
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { | ||
atShutdown(context.Background(), q.Shutdown) | ||
atTerminate(context.Background(), q.Terminate) | ||
log.Debug("%s: %s Starting", q.typ, q.name) | ||
|
||
go func() { | ||
_ = q.AddWorkers(q.workers, 0) | ||
}() | ||
|
||
go q.readToChan() | ||
|
||
log.Trace("%s: %s Waiting til closed", q.typ, q.name) | ||
<-q.closed | ||
log.Trace("%s: %s Waiting til done", q.typ, q.name) | ||
q.Wait() | ||
|
||
log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
atTerminate(ctx, cancel) | ||
q.CleanUp(ctx) | ||
cancel() | ||
} | ||
|
||
func (q *ByteFIFOQueue) readToChan() { | ||
for { | ||
select { | ||
case <-q.closed: | ||
// tell the pool to shutdown. | ||
q.cancel() | ||
return | ||
default: | ||
q.lock.Lock() | ||
bs, err := q.byteFIFO.Pop() | ||
if err != nil { | ||
q.lock.Unlock() | ||
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) | ||
time.Sleep(time.Millisecond * 100) | ||
continue | ||
} | ||
|
||
if len(bs) == 0 { | ||
q.lock.Unlock() | ||
time.Sleep(time.Millisecond * 100) | ||
continue | ||
} | ||
|
||
data, err := unmarshalAs(bs, q.exemplar) | ||
if err != nil { | ||
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) | ||
q.lock.Unlock() | ||
time.Sleep(time.Millisecond * 100) | ||
continue | ||
} | ||
|
||
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) | ||
q.WorkerPool.Push(data) | ||
q.lock.Unlock() | ||
} | ||
} | ||
} | ||
|
||
// Shutdown processing from this queue | ||
func (q *ByteFIFOQueue) Shutdown() { | ||
log.Trace("%s: %s Shutting down", q.typ, q.name) | ||
q.lock.Lock() | ||
select { | ||
case <-q.closed: | ||
default: | ||
close(q.closed) | ||
} | ||
q.lock.Unlock() | ||
log.Debug("%s: %s Shutdown", q.typ, q.name) | ||
} | ||
|
||
// Terminate this queue and close the queue | ||
func (q *ByteFIFOQueue) Terminate() { | ||
log.Trace("%s: %s Terminating", q.typ, q.name) | ||
q.Shutdown() | ||
q.lock.Lock() | ||
select { | ||
case <-q.terminated: | ||
q.lock.Unlock() | ||
return | ||
default: | ||
} | ||
close(q.terminated) | ||
q.lock.Unlock() | ||
if log.IsDebug() { | ||
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) | ||
} | ||
if err := q.byteFIFO.Close(); err != nil { | ||
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) | ||
} | ||
log.Debug("%s: %s Terminated", q.typ, q.name) | ||
} | ||
|
||
var _ (UniqueQueue) = &ByteFIFOUniqueQueue{} | ||
|
||
// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo | ||
type ByteFIFOUniqueQueue struct { | ||
ByteFIFOQueue | ||
} | ||
|
||
// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue | ||
func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) { | ||
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
config := configInterface.(ByteFIFOQueueConfiguration) | ||
|
||
return &ByteFIFOUniqueQueue{ | ||
ByteFIFOQueue: ByteFIFOQueue{ | ||
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), | ||
byteFIFO: byteFIFO, | ||
typ: typ, | ||
closed: make(chan struct{}), | ||
terminated: make(chan struct{}), | ||
exemplar: exemplar, | ||
workers: config.Workers, | ||
name: config.Name, | ||
}, | ||
}, nil | ||
} | ||
|
||
// Has checks if the provided data is in the queue | ||
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { | ||
if !assignableTo(data, q.exemplar) { | ||
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) | ||
} | ||
bs, err := json.Marshal(data) | ||
if err != nil { | ||
return false, err | ||
} | ||
return q.byteFIFO.(UniqueByteFIFO).Has(bs) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.