Skip to content

Commit 62bc2d5

Browse files
authored
Fix AsyncIter implementation of stream. (#1648)
When the `safe_iterators` feature isn't implemented, the `Stream` implementation of AsyncIter should return T, not `RedisResult<T>`.
1 parent 22bf7b8 commit 62bc2d5

File tree

4 files changed

+20
-2
lines changed

4 files changed

+20
-2
lines changed

Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ test:
1212
@echo "===================================================================="
1313
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo nextest run --locked -p redis --no-default-features -F safe_iterators -E 'not test(test_module)'
1414

15+
@echo "===================================================================="
16+
@echo "Testing Connection Type TCP without safe_iterators, but with async"
17+
@echo "===================================================================="
18+
@RUSTFLAGS="-D warnings -A deprecated" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo nextest run --locked -p redis --no-default-features -F tokio-comp -E 'not test(test_module)'
19+
1520
@echo "===================================================================="
1621
@echo "Testing Connection Type TCP with all features and RESP2"
1722
@echo "===================================================================="

redis/examples/async-scan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ async fn main() -> redis::RedisResult<()> {
1010
let _: () = con.set("async-key2", b"foo").await?;
1111

1212
let iter: AsyncIter<String> = con.scan().await?;
13+
#[cfg(not(feature = "safe_iterators"))]
14+
let mut keys: Vec<String> = iter.collect().await;
15+
#[cfg(feature = "safe_iterators")]
1316
let mut keys: Vec<_> = iter.map(std::result::Result::unwrap).collect().await;
1417

1518
keys.sort();

redis/src/cmd.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,13 +262,16 @@ impl<'a, T: FromRedisValue + 'a + Unpin + Send> AsyncIter<'a, T> {
262262
#[cfg(not(feature = "safe_iterators"))]
263263
#[inline]
264264
pub async fn next_item(&mut self) -> Option<T> {
265-
StreamExt::next(self).await?.ok()
265+
StreamExt::next(self).await
266266
}
267267
}
268268

269269
#[cfg(feature = "aio")]
270270
impl<'a, T: FromRedisValue + Unpin + Send + 'a> Stream for AsyncIter<'a, T> {
271+
#[cfg(feature = "safe_iterators")]
271272
type Item = RedisResult<T>;
273+
#[cfg(not(feature = "safe_iterators"))]
274+
type Item = T;
272275

273276
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
274277
let this = self.get_mut();
@@ -289,7 +292,11 @@ impl<'a, T: FromRedisValue + Unpin + Send + 'a> Stream for AsyncIter<'a, T> {
289292
}
290293
Poll::Ready((iter, value)) => {
291294
this.inner = IterOrFuture::Iter(iter);
292-
Poll::Ready(value)
295+
296+
#[cfg(feature = "safe_iterators")]
297+
return Poll::Ready(value);
298+
#[cfg(not(feature = "safe_iterators"))]
299+
Poll::Ready(value.map(|res| res.ok()).flatten())
293300
}
294301
},
295302
IterOrFuture::Empty => unreachable!(),

redis/tests/test_async.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,9 @@ mod basic_async {
939939
}
940940

941941
let iter: redis::AsyncIter<String> = con.scan().await.unwrap();
942+
#[cfg(not(feature = "safe_iterators"))]
943+
let mut keys_from_redis: Vec<String> = iter.collect().await;
944+
#[cfg(feature = "safe_iterators")]
942945
let mut keys_from_redis: Vec<_> =
943946
iter.map(std::result::Result::unwrap).collect().await;
944947
keys_from_redis.sort();

0 commit comments

Comments
 (0)