Skip to content

Commit 38e8937

Browse files
committed
Add stream::flatten implementation
1 parent 1869e04 commit 38e8937

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Stream
3838
- [x] stream::concat
3939
- [x] stream::filter
4040
- [x] stream::filter_map
41-
- [ ] stream::flatten
41+
- [x] stream::flatten
4242
- [ ] stream::fold
4343
- [x] stream::for_each
4444
- [ ] stream::for_each_concurrent

src/stream.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,33 @@ pub fn repeat<T>(item: T) -> impl Stream<Item = T>
144144
})
145145
}
146146

147+
pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
148+
where SubSt: Stream<Item = T>,
149+
St: Stream<Item = SubSt>,
150+
{
151+
let stream = Box::pin(stream);
152+
futures::stream::unfold((Some(stream), None), async move | (mut state_stream, mut state_substream)| {
153+
loop {
154+
if let Some(mut substream) = state_substream.take() {
155+
if let Some(item) = await!(next(&mut substream)) {
156+
return Some((item, (state_stream, Some(substream))))
157+
} else {
158+
continue;
159+
}
160+
}
161+
if let Some(mut stream) = state_stream.take() {
162+
if let Some(substream) = await!(next(&mut stream)) {
163+
let substream = Box::pin(substream);
164+
state_stream = Some(stream);
165+
state_substream = Some(substream);
166+
continue;
167+
}
168+
}
169+
return None;
170+
}
171+
})
172+
}
173+
147174
#[cfg(test)]
148175
mod tests {
149176
use futures::executor;
@@ -256,4 +283,16 @@ mod tests {
256283

257284
assert_eq!(vec![9, 9, 9], executor::block_on(collect::<_, Vec<_>>(stream)));
258285
}
286+
287+
#[test]
288+
fn test_flatten() {
289+
let stream0 = iter(0..0);
290+
let stream1 = iter(1..4);
291+
let stream2 = iter(4..7);
292+
let stream3 = iter(7..10);
293+
let stream = iter(vec![stream0, stream1, stream2, stream3]);
294+
let stream = flatten(stream);
295+
296+
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8, 9], executor::block_on(collect::<_, Vec<_>>(stream)));
297+
}
259298
}

0 commit comments

Comments
 (0)