Skip to content

Commit 61e2e6a

Browse files
committed
Add stream::then implementation
1 parent 71a1d17 commit 61e2e6a

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

src/stream.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,23 @@ pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
171171
})
172172
}
173173

174+
pub fn then<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
175+
where St: Stream,
176+
F: FnMut(St::Item) -> Fut,
177+
Fut: Future<Output = St::Item>
178+
{
179+
let stream = Box::pin(stream);
180+
futures::stream::unfold((stream, f), async move | (mut stream, mut f)| {
181+
let item = await!(next(&mut stream));
182+
if let Some(item) = item {
183+
let new_item = await!(f(item));
184+
Some((new_item, (stream, f)))
185+
} else {
186+
None
187+
}
188+
})
189+
}
190+
174191
#[cfg(test)]
175192
mod tests {
176193
use futures::executor;
@@ -295,4 +312,12 @@ mod tests {
295312

296313
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8, 9], executor::block_on(collect::<_, Vec<_>>(stream)));
297314
}
315+
316+
#[test]
317+
fn test_then() {
318+
let stream = iter(1..=3);
319+
let stream = then(stream, |x| ready(x+3));
320+
321+
assert_eq!(vec![4, 5, 6], executor::block_on(collect::<_, Vec<_>>(stream)));
322+
}
298323
}

0 commit comments

Comments
 (0)