Skip to content

Commit 4d4022e

Browse files
phauslerinvalidnamekperryuaparkera
authored
Add a whole bunch more to the guides (#64)
* Rebase guides WIP to avoid source changes * Add prose for debounce and throttle * Add an alterantive for debounce * Starts to prose for reductions * Add effect listings for all types * Add some examples for reductions * Update Guides/Channel.md Co-authored-by: Chris Adamson <[email protected]> * Update Guides/Channel.md Co-authored-by: Chris Adamson <[email protected]> * Update Guides/Channel.md Co-authored-by: Chris Adamson <[email protected]> * Update Guides/Channel.md Co-authored-by: Chris Adamson <[email protected]> * Update Guides/Collections.md Co-authored-by: Chris Adamson <[email protected]> * Update Guides/Throttle.md Co-authored-by: Chris Adamson <[email protected]> * Update Guides/Throttle.md Co-authored-by: Chris Adamson <[email protected]> * Update Guides/Throttle.md Co-authored-by: Chris Adamson <[email protected]> * Update Sources/AsyncAlgorithms/AsyncInclusiveReductionsSequence.swift Co-authored-by: Chris Adamson <[email protected]> * Adjust events to elements for channels * Add a bit more detail to the README and a start to the guide for Task.select * Chunked updates * Add a getting started section to the README * Add some more prose for Channel * Update Guides/RemoveDuplicates.md Co-authored-by: Chris Adamson <[email protected]> * Apply suggestions from code review Co-authored-by: Chris Adamson <[email protected]> * Add some brief prose for Compacted * reductions could be named scan * spelling and correctness fixes * Apply @kperryua's suggestion for changing the intro of Compacted * Add some brief prose for Timer * Apply suggestions from code review for README Co-authored-by: Tony Parker <[email protected]> * Add some brief prose for RemoveDuplicates * Fleshing out of `Joined` Co-authored-by: Chris Adamson <[email protected]> Co-authored-by: Kevin Perry <[email protected]> Co-authored-by: Tony Parker <[email protected]>
1 parent e87e9dd commit 4d4022e

16 files changed

+1292
-16
lines changed

Guides/Channel.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Channel
2+
3+
* Author(s): [Philippe Hausler](https://github.com/phausler)
4+
5+
[
6+
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChannel.swift),
7+
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncThrowingChannel.swift) |
8+
[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestChannel.swift)
9+
]
10+
11+
## Introduction
12+
13+
`AsyncStream` introduced a mechanism to send buffered elements from a context that doesn't use Swift concurrency into one that does. That design only addressed a portion of the potential use cases; the missing portion was the back pressure excerpted across two concurrency domains.
14+
15+
## Proposed Solution
16+
17+
To achieve a system that supports back pressure and allows for the communication of more than one value from one task to another we are introducing a new type, the _channel_. The channel will be a reference-type asynchronous sequence with an asynchronous sending capability that awaits the consumption of iteration. Each value sent by the channel, or finish transmitted, will await the consumption of that value or event by iteration. That awaiting behavior will allow for the affordance of back pressure applied from the consumption site to be transmitted to the production site. This means that the rate of production cannot exceed the rate of consumption, and that the rate of consumption cannot exceed the rate of production.
18+
19+
## Detailed Design
20+
21+
Similar to the `AsyncStream` and `AsyncThrowingStream` types, the type for sending elements via back pressure will come in two versions. These two versions will account for the throwing nature or non-throwing nature of the elements being produced.
22+
23+
Each type will have functions to send elements and to send terminal events.
24+
25+
```swift
26+
public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
27+
public struct Iterator: AsyncIteratorProtocol, Sendable {
28+
public mutating func next() async -> Element?
29+
}
30+
31+
public init(element elementType: Element.Type = Element.self)
32+
33+
public func send(_ element: Element) async
34+
public func finish() async
35+
36+
public func makeAsyncIterator() -> Iterator
37+
}
38+
39+
public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: AsyncSequence, Sendable {
40+
public struct Iterator: AsyncIteratorProtocol, Sendable {
41+
public mutating func next() async throws -> Element?
42+
}
43+
44+
public init(element elementType: Element.Type = Element.self, failure failureType: Failure.Type = Failure.self)
45+
46+
public func send(_ element: Element) async
47+
public func fail(_ error: Error) async where Failure == Error
48+
public func finish() async
49+
50+
public func makeAsyncIterator() -> Iterator
51+
}
52+
```
53+
54+
Channels are intended to be used as communication types between tasks. Particularly when one task produces values and another task consumes said values. The back pressure applied by `send(_:)`, `fail(_:)` and `finish()` via the suspension/resume ensure that the production of values does not exceed the consumption of values from iteration. Each of these methods suspend after enqueuing the event and are resumed when the next call to `next()` on the `Iterator` is made.
55+
56+
```swift
57+
let channel = AsyncChannel<String>()
58+
Task {
59+
while let resultOfLongCalculation = doLongCalculations() {
60+
await channel.send(resultOfLongCalculation)
61+
}
62+
await channel.finish()
63+
}
64+
65+
for await calculationResult in channel {
66+
print(calculationResult)
67+
}
68+
```
69+
70+
The example above uses a task to perform intense calculations; each of which are sent to the other task via the `send(_:)` method. That call to `send(_:)` returns when the next iteration of the channel is invoked.
71+
72+
## Alternatives Considered
73+
74+
The use of the name "subject" was considered, due to its heritage as a name for a sync-to-async adapter type.
75+
76+
It was considered to make `AsyncChannel` and `AsyncThrowingChannel` actors, however due to the cancellation internals it would imply that these types would need to create new tasks to handle cancel events. The advantages of an actor in this particular case did not outweigh the impact of adjusting the implementations to be actors.
77+
78+
## Credits/Inspiration
79+
80+
`AsyncChannel` and `AsyncThrowingChannel` was heavily inspired from `Subject` but with the key difference that it uses Swift concurrency to apply back pressure.
81+
82+
https://developer.apple.com/documentation/combine/subject/

Guides/Chunked.md

Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
# Chunked
2+
3+
* Author(s): [Kevin Perry](https://github.com/kperryua)
4+
5+
[
6+
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedByGroupSequence.swift),
7+
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedOnProjectionSequence.swift),
8+
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedOnProjectionSequence.swift),
9+
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunksOfCountAndSignalSequence.swift),
10+
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunksOfCountSequence.swift) |
11+
[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestChunk.swift)
12+
]
13+
14+
## Introduction
15+
16+
Grouping of values from an asynchronous sequence is often useful for tasks that involve writing those values efficiently or useful to handle specific structured data inputs.
17+
18+
## Proposed Solution
19+
20+
Chunking operations can be broken down into a few distinct categories: grouping according to a binary predicate used to determine whether consecutive elements belong to the same group, projecting an element's property to determine the element's chunk membership, by discrete count, by another signal asynchronous sequence which indicates when the chunk should be delimited, or by a combination of count and signal.
21+
22+
### Grouping
23+
24+
Group chunks are determined by passing two consecutive elements to a closure which tests whether they are in the same group. When the `AsyncChunkedByGroupSequence` iterator receives the first element from the base sequence, it will immediately be added to a group. When it receives the second item, it tests whether the previous item and the current item belong to the same group. If they are not in the same group, then the iterator emits the first item's group and a new group is created containing the second item. Items declared to be in the same group accumulate until a new group is declared, or the iterator finds the end of the base sequence. When the base sequence terminates, the final group is emitted. If the base sequence throws an error, `AsyncChunkedByGroupSequence` will rethrow that error immediately and discard any current group.
25+
26+
```swift
27+
extension AsyncSequence {
28+
public func chunked<Collected: RangeReplaceableCollection>(
29+
into: Collected.Type,
30+
by belongInSameGroup: @escaping @Sendable (Element, Element) -> Bool
31+
) -> AsyncChunkedByGroupSequence<Self, Collected>
32+
where Collected.Element == Element
33+
34+
public func chunked(
35+
by belongInSameGroup: @escaping @Sendable (Element, Element) -> Bool
36+
) -> AsyncChunkedByGroupSequence<Self, [Element]>
37+
}
38+
```
39+
40+
Consider an example where an asynchronous sequence emits the following values: `10, 20, 30, 10, 40, 40, 10, 20`. Given the chunked operation to be defined as follows:
41+
42+
```swift
43+
let chunks = numbers.chunked { $0 <= $1 }
44+
for await numberChunk in chunks {
45+
print(numberChunk)
46+
}
47+
```
48+
49+
That snippet will produce the following values:
50+
51+
```swift
52+
[10, 20, 30]
53+
[10, 40, 40]
54+
[10, 20]
55+
```
56+
57+
While `Array` is the default type for chunks, thanks to the overload that takes a `RangeReplaceableCollection` type, the same sample can be chunked into instances of `ContiguousArray`, or any other `RangeReplaceableCollection` instead.
58+
59+
```swift
60+
let chunks = numbers.chunked(into: ContiguousArray.self) { $0 <= $1 }
61+
for await numberChunk in chunks {
62+
print(numberChunk)
63+
}
64+
```
65+
66+
That variant is the funnel method for the main implementation, which passes `[Element].self` in as the parameter.
67+
68+
### Projection
69+
70+
In some scenarios, chunks are determined not by comparing different elements, but by the element itself. This may be the case when the element has some sort of discriminator that can determine the chunk it belongs to. When two consecutive elements have different projections, the current chunk is emitted and a new chunk is created for the new element.
71+
72+
When the `AsyncChunkedOnProjectionSequence`'s iterator receives `nil` from the base sequence, it emits the final chunk. When the base sequence throws an error, the iterator discards the current chunk and rethrows that error.
73+
74+
Similarly to the `chunked(by:)` method this algorithm has an optional specification for the `RangeReplaceableCollection` which is used as the type of each chunk.
75+
76+
```swift
77+
extension AsyncSequence {
78+
public func chunked<Subject : Equatable, Collected: RangeReplaceableCollection>(
79+
into: Collected.Type,
80+
on projection: @escaping @Sendable (Element) -> Subject
81+
) -> AsyncChunkedOnProjectionSequence<Self, Subject, Collected>
82+
83+
public func chunked<Subject : Equatable>(
84+
on projection: @escaping @Sendable (Element) -> Subject
85+
) -> AsyncChunkedOnProjectionSequence<Self, Subject, [Element]>
86+
}
87+
```
88+
89+
The following example shows how a sequence of names can be chunked together by their first characters.
90+
91+
```swift
92+
let names = URL(fileURLWithPath: "/tmp/names.txt").lines
93+
let groupedNames = names.chunked(on: \.first!)
94+
for try await (firstLetter, names) in groupedNames {
95+
print(firstLetter)
96+
for name in names {
97+
print(" ", name)
98+
}
99+
}
100+
```
101+
102+
A special property of this kind of projection chunking is that when an asynchronous sequence's elements are known to be ordered, the output of the chunking asynchronous sequence is suitable for initializing dictionaries using the `AsyncSequence` initializer for `Dictionary`. This is because the projection can be easily designed to match the sorting characteristics and thereby guarantee that the output matches the pattern of an array of pairs of unique "keys" with the chunks as the "values".
103+
104+
In the example above, if the names are known to be ordered then you can take advantage of the uniqueness of each "first character" projection to initialize a `Dictionary` like so:
105+
106+
```swift
107+
let names = URL(fileURLWithPath: "/tmp/names.txt").lines
108+
let nameDirectory = try await Dictionary(uniqueKeysWithValues: names.chunked(on: \.first!))
109+
```
110+
111+
### Count or Signal
112+
113+
Sometimes chunks are determined not by the elements themselves, but by external factors. This final category enables limiting chunks to a specific size and/or delimiting them by another asynchronous sequence which is referred to as a "signal". This particular chunking family is useful for scenarios where the elements are more efficiently processed as chunks than individual elements, regardless of their values.
114+
115+
This family is broken down into two sub-families of methods: ones that employ a signal plus an optional count (which return an `AsyncChunksOfCountOrSignalSequence`), and the ones that only deal with counts (which return an `AsyncChunksOfCountSequence`). Both sub-families have `Collected` as their element type, or `Array` if unspecified. These sub-families have rethrowing behaviors; if the base `AsyncSequence` can throw then the chunks sequence can also throw. Likewise if the base `AsyncSequence` cannot throw then the chunks sequence also cannot throw.
116+
117+
##### Count only
118+
119+
```swift
120+
extension AsyncSequence {
121+
public func chunks<Collected: RangeReplaceableCollection>(
122+
ofCount count: Int,
123+
into: Collected.Type
124+
) -> AsyncChunksOfCountSequence<Self, Collected>
125+
where Collected.Element == Element
126+
127+
public func chunks(
128+
ofCount count: Int
129+
) -> AsyncChunksOfCountSequence<Self, [Element]>
130+
}
131+
```
132+
133+
If a chunk size limit is specified via an `ofCount` parameter, the sequence will produce chunks of type `Collected` with at most the specified number of elements. When a chunk reaches the given size, the asynchronous sequence will emit it immediately.
134+
135+
For example, an asynchronous sequence of `UInt8` bytes can be chunked into at most 1024-byte `Data` instances like so:
136+
137+
```swift
138+
let packets = bytes.chunks(ofCount: 1024, into: Data.self)
139+
for try await packet in packets {
140+
write(packet)
141+
}
142+
```
143+
144+
##### Signal only
145+
146+
```swift
147+
extension AsyncSequence {
148+
public func chunked<Signal, Collected: RangeReplaceableCollection>(
149+
by signal: Signal,
150+
into: Collected.Type
151+
) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, Signal>
152+
where Collected.Element == Element
153+
154+
public func chunked<Signal>(
155+
by signal: Signal
156+
) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], Signal>
157+
158+
public func chunked<C: Clock, Collected: RangeReplaceableCollection>(
159+
by timer: AsyncTimerSequence<C>,
160+
into: Collected.Type
161+
) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, AsyncTimerSequence<C>>
162+
where Collected.Element == Element
163+
164+
public func chunked<C: Clock>(
165+
by timer: AsyncTimerSequence<C>
166+
) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], AsyncTimerSequence<C>>
167+
}
168+
```
169+
170+
If a signal asynchronous sequence is specified, the chunking asynchronous sequence emits chunks whenever the signal emits. The signals element values are ignored. If the chunking asynchronous sequence hasn't accumulated any elements since its previous emission, then no value is emitted in response to the signal.
171+
172+
Since time is a frequent method of signaling desired delineations of chunks, there is a pre-specialized set of overloads that take `AsyncTimerSequence`. These allow shorthand initialization by using `AsyncTimerSequence`'s static member initializers.
173+
174+
As an example, an asynchronous sequence of log messages can be chunked into arrays of logs in four second segments like so:
175+
176+
```swift
177+
let fourSecondsOfLogs = logs.chunked(by: .repeating(every: .seconds(4)))
178+
for await chunk in fourSecondsOfLogs {
179+
send(chunk)
180+
}
181+
```
182+
183+
##### Count or Signal
184+
185+
```swift
186+
extension AsyncSequence {
187+
public func chunks<Signal, Collected: RangeReplaceableCollection>(
188+
ofCount count: Int,
189+
or signal: Signal,
190+
into: Collected.Type
191+
) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, Signal>
192+
where Collected.Element == Element
193+
194+
public func chunks<Signal>(
195+
ofCount count: Int,
196+
or signal: Signal
197+
) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], Signal>
198+
199+
public func chunked<C: Clock, Collected: RangeReplaceableCollection>(
200+
by timer: AsyncTimerSequence<C>,
201+
into: Collected.Type
202+
) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, AsyncTimerSequence<C>>
203+
where Collected.Element == Element
204+
205+
public func chunked<C: Clock>(
206+
by timer: AsyncTimerSequence<C>
207+
) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], AsyncTimerSequence<C>>
208+
}
209+
```
210+
211+
If both count and signal are specified, the chunking asynchronous sequence emits chunks whenever *either* the chunk reaches the specified size *or* the signal asynchronous sequence emits. When a signal causes a chunk to be emitted, the accumulated element count is reset back to zero. When an `AsyncTimerSequence` is used as a signal, the timer is started from the moment `next()` is called for the first time on `AsyncChunksOfCountOrSignalSequence`'s iterator, and it emits on a regular cadence from that moment. Note that the scheduling of the timer's emission is unaffected by any chunks emitted based on count.
212+
213+
Like the example above, this code emits up to 1024-byte `Data` instances, but a chunk will also be emitted every second.
214+
215+
```swift
216+
let packets = bytes.chunks(ofCount: 1024 or: .repeating(every: .seconds(1)), into: Data.self)
217+
for try await packet in packets {
218+
write(packet)
219+
}
220+
```
221+
222+
In any configuration of any of the chunking families, when the base asynchronous sequence terminates, one of two things will happen: 1) a partial chunk will be emitted, or 2) no chunk will be emitted (i.e. the iterator received no elements since the emission of the previous chunk). No elements from the base asynchronous sequence are ever discarded, except in the case of a thrown error.
223+
224+
## Detailed Design
225+
226+
### Grouping
227+
228+
```swift
229+
public struct AsyncChunkedByGroupSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection>: AsyncSequence
230+
where Collected.Element == Base.Element {
231+
public typealias Element = Collected
232+
233+
public struct Iterator: AsyncIteratorProtocol {
234+
public mutating func next() async rethrows -> Collected?
235+
}
236+
237+
public func makeAsyncIterator() -> Iterator
238+
}
239+
240+
extension AsyncChunkedByGroupSequence: Sendable
241+
where Base: Sendable, Base.Element: Sendable { }
242+
243+
extension AsyncChunkedByGroupSequence.Iterator: Sendable
244+
where Base.AsyncIterator: Sendable, Base.Element: Sendable { }
245+
```
246+
247+
### Projection
248+
249+
```swift
250+
public struct AsyncChunkedOnProjectionSequence<Base: AsyncSequence, Subject: Equatable, Collected: RangeReplaceableCollection>: AsyncSequence where Collected.Element == Base.Element {
251+
public typealias Element = (Subject, Collected)
252+
253+
public struct Iterator: AsyncIteratorProtocol {
254+
public mutating func next() async rethrows -> (Subject, Collected)?
255+
}
256+
257+
public func makeAsyncIterator() -> Iterator
258+
}
259+
260+
extension AsyncChunkedOnProjectionSequence: Sendable
261+
where Base: Sendable, Base.Element: Sendable { }
262+
extension AsyncChunkedOnProjectionSequence.Iterator: Sendable
263+
where Base.AsyncIterator: Sendable, Base.Element: Sendable, Subject: Sendable { }
264+
```
265+
266+
### Count
267+
268+
```swift
269+
public struct AsyncChunksOfCountSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection>: AsyncSequence
270+
where Collected.Element == Base.Element {
271+
public typealias Element = Collected
272+
273+
public struct Iterator: AsyncIteratorProtocol {
274+
public mutating func next() async rethrows -> Collected?
275+
}
276+
277+
public func makeAsyncIterator() -> Iterator
278+
}
279+
280+
extension AsyncChunksOfCountSequence : Sendable where Base : Sendable, Base.Element : Sendable { }
281+
extension AsyncChunksOfCountSequence.Iterator : Sendable where Base.AsyncIterator : Sendable, Base.Element : Sendable { }
282+
283+
```
284+
285+
### Count or Signal
286+
287+
```swift
288+
public struct AsyncChunksOfCountOrSignalSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection, Signal: AsyncSequence>: AsyncSequence, Sendable
289+
where
290+
Collected.Element == Base.Element,
291+
Base: Sendable, Signal: Sendable,
292+
Base.AsyncIterator: Sendable, Signal.AsyncIterator: Sendable,
293+
Base.Element: Sendable, Signal.Element: Sendable {
294+
public typealias Element = Collected
295+
296+
public struct Iterator: AsyncIteratorProtocol, Sendable {
297+
public mutating func next() async rethrows -> Collected?
298+
}
299+
300+
public func makeAsyncIterator() -> Iterator
301+
}
302+
```
303+
304+
## Alternatives Considered
305+
306+
It was considered to make the chunked element to be an `AsyncSequence` instead of allowing collection into a `RangeReplaceableCollection` however it was determined that the throwing behavior of that would be complex to understand. If that hurdle could be overcome then that might be a future direction/consideration that would be worth exploring.
307+
308+
Variants of `chunked(by:)` (grouping) and `chunked(on:)` (projection) methods could be added that take delimiting `Signal` and `AsyncTimerSequence` inputs similar to `chunked(byCount:or:)`. However, it was decided that such functionality was likely to be underutilized and not worth the complication to the already broad surface area of `chunked` methods.
309+
310+
The naming of this family was considered to be `collect` which is used in APIs like `Combine`. This family of functions has distinct similarity to those APIs.
311+
312+
## Credits/Inspiration
313+
314+
This transformation function is a heavily inspired analog of the synchronous version [defined in the Swift Algorithms package](https://github.com/apple/swift-algorithms/blob/main/Guides/Chunked.md)

0 commit comments

Comments
 (0)