Skip to content

Commit 62cac49

Browse files
committed
Make notification iterators fallible
1 parent 44bf1c9 commit 62cac49

File tree

2 files changed

+47
-45
lines changed

2 files changed

+47
-45
lines changed

src/notification.rs

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Asynchronous notifications.
22
3+
use fallible_iterator::{FallibleIterator, IntoFallibleIterator};
34
use std::fmt;
45
use std::time::Duration;
56

@@ -42,7 +43,7 @@ impl<'conn> Notifications<'conn> {
4243
self.len() == 0
4344
}
4445

45-
/// Returns an iterator over pending notifications.
46+
/// Returns a fallible iterator over pending notifications.
4647
///
4748
/// # Note
4849
///
@@ -52,16 +53,16 @@ impl<'conn> Notifications<'conn> {
5253
Iter { conn: self.conn }
5354
}
5455

55-
/// Returns an iterator over notifications that blocks until one is
56+
/// Returns a fallible iterator over notifications that blocks until one is
5657
/// received if none are pending.
5758
///
5859
/// The iterator will never return `None`.
5960
pub fn blocking_iter<'a>(&'a self) -> BlockingIter<'a> {
6061
BlockingIter { conn: self.conn }
6162
}
6263

63-
/// Returns an iterator over notifications that blocks for a limited time
64-
/// waiting to receive one if none are pending.
64+
/// Returns a fallible iterator over notifications that blocks for a limited
65+
/// time waiting to receive one if none are pending.
6566
///
6667
/// # Note
6768
///
@@ -75,11 +76,12 @@ impl<'conn> Notifications<'conn> {
7576
}
7677
}
7778

78-
impl<'a, 'conn> IntoIterator for &'a Notifications<'conn> {
79-
type Item = Result<Notification>;
79+
impl<'a, 'conn> IntoFallibleIterator for &'a Notifications<'conn> {
80+
type Item = Notification;
81+
type Error = Error;
8082
type IntoIter = Iter<'a>;
8183

82-
fn into_iter(self) -> Iter<'a> {
84+
fn into_fallible_iterator(self) -> Iter<'a> {
8385
self.iter()
8486
}
8587
}
@@ -90,35 +92,36 @@ impl<'conn> NotificationsNew<'conn> for Notifications<'conn> {
9092
}
9193
}
9294

93-
/// An iterator over pending notifications.
95+
/// A fallible iterator over pending notifications.
9496
pub struct Iter<'a> {
9597
conn: &'a Connection,
9698
}
9799

98-
impl<'a> Iterator for Iter<'a> {
99-
type Item = Result<Notification>;
100+
impl<'a> FallibleIterator for Iter<'a> {
101+
type Item = Notification;
102+
type Error = Error;
100103

101-
fn next(&mut self) -> Option<Result<Notification>> {
104+
fn next(&mut self) -> Result<Option<Notification>> {
102105
let mut conn = self.conn.conn.borrow_mut();
103106

104107
if let Some(notification) = conn.notifications.pop_front() {
105-
return Some(Ok(notification));
108+
return Ok(Some(notification));
106109
}
107110

108111
if conn.is_desynchronized() {
109-
return Some(Err(Error::Io(desynchronized())));
112+
return Err(Error::Io(desynchronized()));
110113
}
111114

112115
match conn.read_message_with_notification_nonblocking() {
113116
Ok(Some(Backend::NotificationResponse { process_id, channel, payload })) => {
114-
Some(Ok(Notification {
117+
Ok(Some(Notification {
115118
process_id: process_id,
116119
channel: channel,
117120
payload: payload,
118121
}))
119122
}
120-
Ok(None) => None,
121-
Err(err) => Some(Err(Error::Io(err))),
123+
Ok(None) => Ok(None),
124+
Err(err) => Err(Error::Io(err)),
122125
_ => unreachable!(),
123126
}
124127
}
@@ -133,36 +136,33 @@ pub struct BlockingIter<'a> {
133136
conn: &'a Connection,
134137
}
135138

136-
impl<'a> Iterator for BlockingIter<'a> {
137-
type Item = Result<Notification>;
139+
impl<'a> FallibleIterator for BlockingIter<'a> {
140+
type Item = Notification;
141+
type Error = Error;
138142

139-
fn next(&mut self) -> Option<Result<Notification>> {
143+
fn next(&mut self) -> Result<Option<Notification>> {
140144
let mut conn = self.conn.conn.borrow_mut();
141145

142146
if let Some(notification) = conn.notifications.pop_front() {
143-
return Some(Ok(notification));
147+
return Ok(Some(notification));
144148
}
145149

146150
if conn.is_desynchronized() {
147-
return Some(Err(Error::Io(desynchronized())));
151+
return Err(Error::Io(desynchronized()));
148152
}
149153

150154
match conn.read_message_with_notification() {
151155
Ok(Backend::NotificationResponse { process_id, channel, payload }) => {
152-
Some(Ok(Notification {
156+
Ok(Some(Notification {
153157
process_id: process_id,
154158
channel: channel,
155159
payload: payload,
156160
}))
157161
}
158-
Err(err) => Some(Err(Error::Io(err))),
162+
Err(err) => Err(Error::Io(err)),
159163
_ => unreachable!(),
160164
}
161165
}
162-
163-
fn size_hint(&self) -> (usize, Option<usize>) {
164-
(usize::max_value(), None)
165-
}
166166
}
167167

168168
/// An iterator over notifications which will block for a period of time if
@@ -172,30 +172,31 @@ pub struct TimeoutIter<'a> {
172172
timeout: Duration,
173173
}
174174

175-
impl<'a> Iterator for TimeoutIter<'a> {
176-
type Item = Result<Notification>;
175+
impl<'a> FallibleIterator for TimeoutIter<'a> {
176+
type Item = Notification;
177+
type Error = Error;
177178

178-
fn next(&mut self) -> Option<Result<Notification>> {
179+
fn next(&mut self) -> Result<Option<Notification>> {
179180
let mut conn = self.conn.conn.borrow_mut();
180181

181182
if let Some(notification) = conn.notifications.pop_front() {
182-
return Some(Ok(notification));
183+
return Ok(Some(notification));
183184
}
184185

185186
if conn.is_desynchronized() {
186-
return Some(Err(Error::Io(desynchronized())));
187+
return Err(Error::Io(desynchronized()));
187188
}
188189

189190
match conn.read_message_with_notification_timeout(self.timeout) {
190191
Ok(Some(Backend::NotificationResponse { process_id, channel, payload })) => {
191-
Some(Ok(Notification {
192+
Ok(Some(Notification {
192193
process_id: process_id,
193194
channel: channel,
194195
payload: payload,
195196
}))
196197
}
197-
Ok(None) => None,
198-
Err(err) => Some(Err(Error::Io(err))),
198+
Ok(None) => Ok(None),
199+
Err(err) => Err(Error::Io(err)),
199200
_ => unreachable!(),
200201
}
201202
}

tests/test.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
extern crate fallible_iterator;
12
#[macro_use]
23
extern crate postgres;
34
extern crate url;
@@ -6,10 +7,7 @@ extern crate openssl;
67
#[cfg(feature = "with-security-framework")]
78
extern crate security_framework;
89

9-
use std::thread;
10-
use std::io;
11-
use std::time::Duration;
12-
10+
use fallible_iterator::FallibleIterator;
1311
use postgres::{HandleNotice, Connection, GenericConnection, TlsMode};
1412
use postgres::transaction::{self, IsolationLevel};
1513
use postgres::error::{Error, ConnectError, DbError};
@@ -24,6 +22,9 @@ use postgres::error::ErrorPosition::Normal;
2422
use postgres::rows::RowIndex;
2523
use postgres::notification::Notification;
2624
use postgres::params::IntoConnectParams;
25+
use std::thread;
26+
use std::io;
27+
use std::time::Duration;
2728

2829
macro_rules! or_panic {
2930
($e:expr) => (
@@ -557,7 +558,7 @@ fn test_custom_notice_handler() {
557558
#[test]
558559
fn test_notification_iterator_none() {
559560
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", TlsMode::None));
560-
assert!(conn.notifications().iter().next().is_none());
561+
assert!(conn.notifications().iter().next().unwrap().is_none());
561562
}
562563

563564
fn check_notification(expected: Notification, actual: Notification) {
@@ -585,15 +586,15 @@ fn test_notification_iterator_some() {
585586
channel: "test_notification_iterator_one_channel2".to_string(),
586587
payload: "world".to_string()
587588
}, it.next().unwrap().unwrap());
588-
assert!(it.next().is_none());
589+
assert!(it.next().unwrap().is_none());
589590

590591
or_panic!(conn.execute("NOTIFY test_notification_iterator_one_channel, '!'", &[]));
591592
check_notification(Notification {
592593
process_id: 0,
593594
channel: "test_notification_iterator_one_channel".to_string(),
594595
payload: "!".to_string()
595596
}, it.next().unwrap().unwrap());
596-
assert!(it.next().is_none());
597+
assert!(it.next().unwrap().is_none());
597598
}
598599

599600
#[test]
@@ -612,7 +613,7 @@ fn test_notifications_next_block() {
612613
process_id: 0,
613614
channel: "test_notifications_next_block".to_string(),
614615
payload: "foo".to_string()
615-
}, or_panic!(notifications.blocking_iter().next().unwrap()));
616+
}, notifications.blocking_iter().next().unwrap().unwrap());
616617
}
617618

618619
#[test]
@@ -634,9 +635,9 @@ fn test_notification_next_timeout() {
634635
process_id: 0,
635636
channel: "test_notifications_next_timeout".to_string(),
636637
payload: "foo".to_string()
637-
}, or_panic!(it.next().unwrap()));
638+
}, it.next().unwrap().unwrap());
638639

639-
assert!(it.next().is_none());
640+
assert!(it.next().unwrap().is_none());
640641
}
641642

642643
#[test]

0 commit comments

Comments
 (0)