Skip to content

Commit 63980d4

Browse files
committed
Add stream::zip implementation
1 parent 0bff08a commit 63980d4

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

src/stream.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,22 @@ pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
209209
})
210210
}
211211

212+
pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item, St2::Item)>
213+
where St1: Stream,
214+
St2: Stream,
215+
{
216+
let stream = Box::pin(stream);
217+
let other = Box::pin(other);
218+
futures::stream::unfold((stream, other), async move | (mut stream, mut other)| {
219+
let left = await!(next(&mut stream));
220+
let right = await!(next(&mut other));
221+
match (left, right) {
222+
(Some(left), Some(right)) => Some(((left, right), (stream, other))),
223+
_ => None
224+
}
225+
})
226+
}
227+
212228
#[cfg(test)]
213229
mod tests {
214230
use futures::executor;
@@ -349,4 +365,13 @@ mod tests {
349365

350366
assert_eq!(vec![6, 7, 8, 9, 10], executor::block_on(collect::<_, Vec<_>>(stream)));
351367
}
368+
369+
#[test]
370+
fn test_zip() {
371+
let stream1 = iter(1..=3);
372+
let stream2 = iter(5..=10);
373+
let stream = zip(stream1, stream2);
374+
375+
assert_eq!(vec![(1, 5), (2, 6), (3, 7)], executor::block_on(collect::<_, Vec<_>>(stream)));
376+
}
352377
}

0 commit comments

Comments
 (0)