Skip to content

First pass at some documentation for AsyncBufferSequence #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 18, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Sources/AsyncAlgorithms/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,33 @@ actor AsyncBufferState<Input: Sendable, Output: Sendable> {
}
}

/// An asynchronous buffer storage actor protocol used for buffering
/// elements to an `AsyncBufferSequence`.
@rethrows
public protocol AsyncBuffer: Actor {
associatedtype Input: Sendable
associatedtype Output: Sendable

/// Push an element to enqueue to the buffer
func push(_ element: Input) async

/// Pop an element from the buffer.
///
/// Implementors of `pop()` may throw. In cases where types
/// throw from this function, that throwing behavior contributes to
/// the rethrowing characteristics of `AsyncBufferSequence`.
func pop() async throws -> Output?
}

/// A buffer that limits pushed items by a certain count.
public actor AsyncLimitBuffer<Element: Sendable>: AsyncBuffer {
/// A policy for buffering elements to an `AsyncLimitBuffer`
public enum Policy: Sendable {
/// A policy for no bounding limit of pushed elements.
case unbounded
/// A policy for limiting to a specific number of oldest values.
case bufferingOldest(Int)
/// A policy for limiting to a specific number of newest values.
case bufferingNewest(Int)
}

Expand All @@ -154,6 +168,7 @@ public actor AsyncLimitBuffer<Element: Sendable>: AsyncBuffer {
self.policy = policy
}

/// Push an element to enqueue to the buffer.
public func push(_ element: Element) async {
switch policy {
case .unbounded:
Expand All @@ -174,6 +189,7 @@ public actor AsyncLimitBuffer<Element: Sendable>: AsyncBuffer {
}
}

/// Pop an element from the buffer.
public func pop() async -> Element? {
guard buffer.count > 0 else {
return nil
Expand All @@ -183,17 +199,31 @@ public actor AsyncLimitBuffer<Element: Sendable>: AsyncBuffer {
}

extension AsyncSequence where Element: Sendable {
/// Creates an asynchronous sequence that buffers elements using a buffer created from a supplied closure.
///
/// Use the `buffer(_:)` method to account for `AsyncSequence` types that may produce elements faster
/// than they are iterated. The `createBuffer` closure returns a backing buffer for storing elements and dealing with
/// behavioral charcteristics of the `buffer(_:)` algorithm.
///
/// - Parameter createBuffer: A closure that constructs a new `AsyncBuffer` actor to store buffered values.
/// - Returns: An asynchronous sequence that buffers elements using the specified `AsyncBuffer`.
public func buffer<Buffer: AsyncBuffer>(_ createBuffer: @Sendable @escaping () -> Buffer) -> AsyncBufferSequence<Self, Buffer> where Buffer.Input == Element {
AsyncBufferSequence(self, createBuffer: createBuffer)
}

/// Creates an asynchronous sequence that buffers elements using a specific policy to limit the number of
/// elements that are buffered.
///
/// - Parameter policy: A limiting policy behavior on the buffering behavior of the `AsyncBufferSequence`
/// - Returns: An asynchronous sequence that buffers elements up to a given limit.
public func buffer(policy limit: AsyncLimitBuffer<Element>.Policy) -> AsyncBufferSequence<Self, AsyncLimitBuffer<Element>> {
buffer {
AsyncLimitBuffer(policy: limit)
}
}
}

/// An `AsyncSequence` that buffers elements utilizing an `AsyncBuffer`.
public struct AsyncBufferSequence<Base: AsyncSequence, Buffer: AsyncBuffer> where Base.Element == Buffer.Input, Base.AsyncIterator: Sendable {
let base: Base
let createBuffer: @Sendable () -> Buffer
Expand All @@ -210,6 +240,7 @@ extension AsyncBufferSequence.Iterator: Sendable where Base: Sendable, Base.Asyn
extension AsyncBufferSequence: AsyncSequence {
public typealias Element = Buffer.Output

/// The iterator for a `AsyncBufferSequence` instance.
public struct Iterator: AsyncIteratorProtocol {
struct Active {
var task: Task<Void, Never>?
Expand Down