|
| 1 | +# Chunked |
| 2 | + |
| 3 | +* Author(s): [Kevin Perry](https://github.com/kperryua) |
| 4 | + |
| 5 | +[ |
| 6 | +[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedByGroupSequence.swift), |
| 7 | +[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedOnProjectionSequence.swift), |
| 8 | +[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedOnProjectionSequence.swift), |
| 9 | +[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunksOfCountAndSignalSequence.swift), |
| 10 | +[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunksOfCountSequence.swift) | |
| 11 | +[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestChunk.swift) |
| 12 | +] |
| 13 | + |
| 14 | +## Introduction |
| 15 | + |
| 16 | +Grouping of values from an asynchronous sequence is often useful for tasks that involve writing those values effeciently or useful to handle specific structured data inputs. |
| 17 | + |
| 18 | +## Proposed Solution |
| 19 | + |
| 20 | +Chunking operations can be broken down into a few distinct categories: grouping by some sort of predicate determining if elements belong to the same group, projecting a property to determine the element's chunk, or by an optional discrete count in potential combination with a timed signal indicating when the chunk should be delimited. |
| 21 | + |
| 22 | +### Grouping |
| 23 | + |
| 24 | +Chunking by group can be determined by passing two elements to determine if they are in the same group. The first element awaited by iteration of a `AsyncChunkedByGroupSequence` will immediately be in a group, the second item will test that previous item along with the current one to determine if they belong to the same group. If they are not in the same group then the first item's group is emitted. Elsewise it will continue on until a new group is determined or the end of the sequence is reached. If an error is thrown during iteration of the base it will rethrow that error immediately and terminate any current grouping. |
| 25 | + |
| 26 | +```swift |
| 27 | +extension AsyncSequence { |
| 28 | + public func chunked<Collected: RangeReplaceableCollection>( |
| 29 | + into: Collected.Type, |
| 30 | + by belongInSameGroup: @escaping @Sendable (Element, Element) -> Bool |
| 31 | + ) -> AsyncChunkedByGroupSequence<Self, Collected> |
| 32 | + where Collected.Element == Element |
| 33 | + |
| 34 | + public func chunked( |
| 35 | + by belongInSameGroup: @escaping @Sendable (Element, Element) -> Bool |
| 36 | + ) -> AsyncChunkedByGroupSequence<Self, [Element]> |
| 37 | +} |
| 38 | +``` |
| 39 | + |
| 40 | +Consider an example where an async sequence emits the following values: `10, 20, 30, 10, 40, 40, 10, 20`. Given the chunked operation to be defined as follows: |
| 41 | + |
| 42 | +```swift |
| 43 | +let chunks = numbers.chunked { $0 <= $1 } |
| 44 | +for await numberChunk in chunks { |
| 45 | + print(numberChunk) |
| 46 | +} |
| 47 | +``` |
| 48 | + |
| 49 | +That snippet will produce the following values: |
| 50 | + |
| 51 | +```swift |
| 52 | +[10, 20, 30] |
| 53 | +[10, 40, 40] |
| 54 | +[10, 20] |
| 55 | +``` |
| 56 | + |
| 57 | +That same sample could also be expressed as chunking into `ContiguousArray` types instead of `Array`. |
| 58 | + |
| 59 | +```swift |
| 60 | +let chunks = numbers.chunked(into: ContiguousArray.self) { $0 <= $1 } |
| 61 | +for await numberChunk in chunks { |
| 62 | + print(numberChunk) |
| 63 | +} |
| 64 | +``` |
| 65 | + |
| 66 | +That variant is the funnel method for the default implementation that passes `Array<Element>.self` in as the parameter. |
| 67 | + |
| 68 | +### Projected Seperator |
| 69 | + |
| 70 | +Other scenarios can determine the grouping behavior by the element itself. This is often times when the element contains some sort of descriminator about the grouping it belongs to. |
| 71 | + |
| 72 | +Similarly to the `chunked(by:)` API this algorithm has an optional specification for the `RangeReplacableCollection` that the chunks are comprised of. This means that other collection types other than just `Array` can be used to "packetize" the elements. |
| 73 | + |
| 74 | +When the base asynchronous sequence being iterated by `AsyncChunkedOnProjectionSequence` throws the iteration of the `AsyncChunkedOnProjectionSequence` rethrows that error. When the end of iteration occurs via returning nil from the iteration the iteration of the `AsyncChunkedOnProjectionSequence` then will return the final collected chunk. |
| 75 | + |
| 76 | +```swift |
| 77 | +extension AsyncSequence { |
| 78 | + public func chunked<Subject : Equatable, Collected: RangeReplaceableCollection>( |
| 79 | + into: Collected.Type, |
| 80 | + on projection: @escaping @Sendable (Element) -> Subject |
| 81 | + ) -> AsyncChunkedOnProjectionSequence<Self, Subject, Collected> |
| 82 | + |
| 83 | + public func chunked<Subject : Equatable>( |
| 84 | + on projection: @escaping @Sendable (Element) -> Subject |
| 85 | + ) -> AsyncChunkedOnProjectionSequence<Self, Subject, [Element]> |
| 86 | +} |
| 87 | +``` |
| 88 | + |
| 89 | +Chunked asynchronous sequences on grouping can give iterative categorization or in the cases where it is known ordered elements suitable uniqueness for initializing dictionaries via the `AsyncSequence` initializer for `Dictionary`. |
| 90 | + |
| 91 | +```swift |
| 92 | +let names = URL(fileURLWithPath: "/tmp/names.txt").lines |
| 93 | +let groupedNames = names.chunked(on: \.first!) |
| 94 | +for try await (firstLetter, names) in groupedNames { |
| 95 | + print(firstLetter) |
| 96 | + for name in names { |
| 97 | + print(" ", name) |
| 98 | + } |
| 99 | +} |
| 100 | +``` |
| 101 | + |
| 102 | +In the example above, if the names are known to be ordered then the uniqueness can be passed to `Dictionary.init(uniqueKeysWithValues:)`. |
| 103 | + |
| 104 | +```swift |
| 105 | +let names = URL(fileURLWithPath: "/tmp/names.txt").lines |
| 106 | +let nameDirectory = try await Dictionary(uniqueKeysWithValues: names.chunked(on: \.first!)) |
| 107 | +``` |
| 108 | + |
| 109 | +### Count or Signal |
| 110 | + |
| 111 | +The final category is to either delimit chunks of a specific count/size or by a signal. This particular transform family is useful for packetization where the packets being used are more effeciently handled as batches than individual elements. |
| 112 | + |
| 113 | +This family is broken down into two sub-familes of methods. Ones that can transact upon a count or signal (which return a `AsyncChunksOfCountOrSignalSequence`), and the ones who only deal with counts (which return a `AsyncChunksOfCountSequence`). Both sub-familes have similar properties with the regards to the element they are producing; they both have the `Collected` as their element type. By default the produced element type is an array of the base asynchronous sequence's element. Iterating these sub-families have rethrowing behaviors; if the base `AsyncSequence` throws then the chunks sequence throws as well, likewise if the base `AsyncSequence` does not throw then the chunks sequence does not throw. |
| 114 | + |
| 115 | +Any limitation upon the count of via the `ofCount` variants will produce `Collected` elements with at most the specified number of elements. At termination of these the final collected elements may be less than the specified count. |
| 116 | + |
| 117 | +Since time is a critical method of signaling specific deliniations of chunks there is a pre-specialized variant of those methods for signals. These allow shorthand initialization via the static member initializers. |
| 118 | + |
| 119 | +```swift |
| 120 | +extension AsyncSequence { |
| 121 | + public func chunks<Signal, Collected: RangeReplaceableCollection>( |
| 122 | + ofCount count: Int, |
| 123 | + or signal: Signal, |
| 124 | + into: Collected.Type |
| 125 | + ) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, Signal> |
| 126 | + where Collected.Element == Element |
| 127 | + |
| 128 | + public func chunks<Signal>( |
| 129 | + ofCount count: Int, |
| 130 | + or signal: Signal |
| 131 | + ) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], Signal> |
| 132 | + |
| 133 | + public func chunked<Signal, Collected: RangeReplaceableCollection>( |
| 134 | + by signal: Signal, |
| 135 | + into: Collected.Type |
| 136 | + ) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, Signal> |
| 137 | + where Collected.Element == Element |
| 138 | + |
| 139 | + public func chunked<Signal>( |
| 140 | + by signal: Signal |
| 141 | + ) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], Signal> |
| 142 | + |
| 143 | + public func chunks<C: Clock, Collected: RangeReplaceableCollection>( |
| 144 | + ofCount count: Int, |
| 145 | + or timer: AsyncTimerSequence<C>, |
| 146 | + into: Collected.Type |
| 147 | + ) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, AsyncTimerSequence<C>> |
| 148 | + where Collected.Element == Element |
| 149 | + |
| 150 | + public func chunks<C: Clock>( |
| 151 | + ofCount count: Int, |
| 152 | + or timer: AsyncTimerSequence<C> |
| 153 | + ) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], AsyncTimerSequence<C>> |
| 154 | + |
| 155 | + public func chunked<C: Clock, Collected: RangeReplaceableCollection>( |
| 156 | + by timer: AsyncTimerSequence<C>, |
| 157 | + into: Collected.Type |
| 158 | + ) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, AsyncTimerSequence<C>> |
| 159 | + where Collected.Element == Element |
| 160 | + |
| 161 | + public func chunked<C: Clock>( |
| 162 | + by timer: AsyncTimerSequence<C> |
| 163 | + ) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], AsyncTimerSequence<C>> |
| 164 | +} |
| 165 | + |
| 166 | +extension AsyncSequence { |
| 167 | + public func chunks<Collected: RangeReplaceableCollection>( |
| 168 | + ofCount count: Int, |
| 169 | + into: Collected.Type |
| 170 | + ) -> AsyncChunksOfCountSequence<Self, Collected> |
| 171 | + where Collected.Element == Element |
| 172 | + |
| 173 | + public func chunks( |
| 174 | + ofCount count: Int |
| 175 | + ) -> AsyncChunksOfCountSequence<Self, [Element]> |
| 176 | +} |
| 177 | +``` |
| 178 | + |
| 179 | +```swift |
| 180 | +let packets = bytes.chunks(ofCount: 1024, into: Data.self) |
| 181 | +for try await packet in packets { |
| 182 | + write(packet) |
| 183 | +} |
| 184 | +``` |
| 185 | + |
| 186 | +```swift |
| 187 | +let fourSecondsOfLogs = logs.chunked(by: .repeating(every: .seconds(4))) |
| 188 | +``` |
| 189 | + |
| 190 | +```swift |
| 191 | +let packets = bytes.chunks(ofCount: 1024 or: .repeating(every: .seconds(1)), into: Data.self) |
| 192 | +for try await packet in packets { |
| 193 | + write(packet) |
| 194 | +} |
| 195 | +``` |
| 196 | + |
| 197 | +## Detailed Design |
| 198 | + |
| 199 | +### Grouping |
| 200 | + |
| 201 | +```swift |
| 202 | +public struct AsyncChunkedByGroupSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection>: AsyncSequence |
| 203 | + where Collected.Element == Base.Element { |
| 204 | + public typealias Element = Collected |
| 205 | + |
| 206 | + public struct Iterator: AsyncIteratorProtocol { |
| 207 | + public mutating func next() async rethrows -> Collected? |
| 208 | + } |
| 209 | + |
| 210 | + public func makeAsyncIterator() -> Iterator |
| 211 | +} |
| 212 | + |
| 213 | +extension AsyncChunkedByGroupSequence: Sendable |
| 214 | + where Base: Sendable, Base.Element: Sendable { } |
| 215 | + |
| 216 | +extension AsyncChunkedByGroupSequence.Iterator: Sendable |
| 217 | + where Base.AsyncIterator: Sendable, Base.Element: Sendable { } |
| 218 | +``` |
| 219 | + |
| 220 | +### Projected Seperator |
| 221 | + |
| 222 | +```swift |
| 223 | +public struct AsyncChunkedOnProjectionSequence<Base: AsyncSequence, Subject: Equatable, Collected: RangeReplaceableCollection>: AsyncSequence where Collected.Element == Base.Element { |
| 224 | + public typealias Element = (Subject, Collected) |
| 225 | + |
| 226 | + public struct Iterator: AsyncIteratorProtocol { |
| 227 | + public mutating func next() async rethrows -> (Subject, Collected)? |
| 228 | + } |
| 229 | + |
| 230 | + public func makeAsyncIterator() -> Iterator |
| 231 | +} |
| 232 | + |
| 233 | +extension AsyncChunkedOnProjectionSequence: Sendable |
| 234 | + where Base: Sendable, Base.Element: Sendable { } |
| 235 | +extension AsyncChunkedOnProjectionSequence.Iterator: Sendable |
| 236 | + where Base.AsyncIterator: Sendable, Base.Element: Sendable, Subject: Sendable { } |
| 237 | +``` |
| 238 | + |
| 239 | +### Count |
| 240 | + |
| 241 | +```swift |
| 242 | +public struct AsyncChunksOfCountSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection>: AsyncSequence |
| 243 | + where Collected.Element == Base.Element { |
| 244 | + public typealias Element = Collected |
| 245 | + |
| 246 | + public struct Iterator: AsyncIteratorProtocol { |
| 247 | + public mutating func next() async rethrows -> Collected? |
| 248 | + } |
| 249 | + |
| 250 | + public func makeAsyncIterator() -> Iterator |
| 251 | +} |
| 252 | + |
| 253 | +extension AsyncChunksOfCountSequence : Sendable where Base : Sendable, Base.Element : Sendable { } |
| 254 | +extension AsyncChunksOfCountSequence.Iterator : Sendable where Base.AsyncIterator : Sendable, Base.Element : Sendable { } |
| 255 | + |
| 256 | +``` |
| 257 | + |
| 258 | +### Count or Signal |
| 259 | + |
| 260 | +```swift |
| 261 | +public struct AsyncChunksOfCountOrSignalSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection, Signal: AsyncSequence>: AsyncSequence, Sendable |
| 262 | + where |
| 263 | + Collected.Element == Base.Element, |
| 264 | + Base: Sendable, Signal: Sendable, |
| 265 | + Base.AsyncIterator: Sendable, Signal.AsyncIterator: Sendable, |
| 266 | + Base.Element: Sendable, Signal.Element: Sendable { |
| 267 | + public typealias Element = Collected |
| 268 | + |
| 269 | + public struct Iterator: AsyncIteratorProtocol, Sendable { |
| 270 | + public mutating func next() async rethrows -> Collected? |
| 271 | + } |
| 272 | + |
| 273 | + public func makeAsyncIterator() -> Iterator |
| 274 | +} |
| 275 | +``` |
| 276 | + |
| 277 | +## Alternatives Considered |
| 278 | + |
| 279 | +It was considered to make the chunked element to be an `AsyncSequence` instead of allowing collection into a `RangeReplacableCollection` however it was determined that the throwing behavior of that would be complex to understand. If that hurddle could be overcome then that might be a future direction/consideration that would be worth exploring. |
| 280 | + |
| 281 | +The naming of this family was considered to be `collect` which is used in APIs like `Combine`. This family of functions has distinct similarity to those APIs. |
| 282 | + |
| 283 | +## Credits/Inspiration |
| 284 | + |
| 285 | +This transformation function is a heavily inspired analog of the synchronous version [defined in the Swift Algorithms package](https://github.com/apple/swift-algorithms/blob/main/Guides/Chunked.md) |
0 commit comments