Skip to content

Commit 08cf1af

Browse files
netvldignifiedquire
authored andcommitted
fix: correctly decode incomplete reads of long IMAP messages
Previously, the `ImapStream::poll_next` method did not persist its state correctly across `Poll::Pending` results of the underlying byte stream. This resulted in an empty buffer and empty position being used for each following call of `poll_next`, which therefore caused the `decode` method being invoked not on the entire accumulated buffer, but on individual chunks of input. This commit fixes this problematic behavior by properly persisting the buffer and the current position across `poll_next` invocations. In addition, this commit replaces the (usize, usize) structure representing position with a proper struct, which makes more semantically and syntactically clear which kind of index is being used.
1 parent cc24f5d commit 08cf1af

File tree

1 file changed

+68
-22
lines changed

1 file changed

+68
-22
lines changed

src/imap_stream.rs

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,27 @@ pub struct ImapStream<R: Read + Write> {
2929
/// Buffer for the already read, but not yet parsed data.
3030
buffer: Block<'static>,
3131
/// Position of valid read data into buffer.
32-
current: (usize, usize),
32+
current: Position,
3333
/// How many bytes do we need to finishe the currrent element that is being decoded.
3434
decode_needs: usize,
35+
/// Whether we should attempt to decode whatever is currently inside the buffer.
36+
/// False indicates that we know for certain that the buffer is incomplete.
37+
initial_decode: bool,
38+
}
39+
40+
/// A semantically explicit slice of a buffer.
41+
#[derive(Eq, PartialEq, Debug, Copy, Clone)]
42+
struct Position {
43+
start: usize,
44+
end: usize,
45+
}
46+
47+
impl Position {
48+
const ZERO: Position = Position { start: 0, end: 0 };
49+
50+
const fn new(start: usize, end: usize) -> Position {
51+
Position { start, end }
52+
}
3553
}
3654

3755
enum DecodeResult {
@@ -70,13 +88,14 @@ impl<R: Read + Write + Unpin> ImapStream<R> {
7088
ImapStream {
7189
inner,
7290
buffer: POOL.alloc(INITIAL_CAPACITY),
73-
current: (0, 0),
91+
current: Position::ZERO,
7492
decode_needs: 0,
93+
initial_decode: false, // buffer is empty initially, nothing to decode
7594
}
7695
}
7796

7897
pub async fn encode(&mut self, msg: Request) -> Result<(), io::Error> {
79-
log::trace!("> {:?}", msg);
98+
log::trace!("encode: input: {:?}", msg);
8099

81100
if let Some(tag) = msg.0 {
82101
self.inner.write_all(tag.as_bytes()).await?;
@@ -109,7 +128,7 @@ impl<R: Read + Write + Unpin> ImapStream<R> {
109128
start: usize,
110129
end: usize,
111130
) -> io::Result<DecodeResult> {
112-
log::trace!("< {:?}", std::str::from_utf8(&buf[start..end]));
131+
log::trace!("decode: input: {:?}", std::str::from_utf8(&buf[start..end]));
113132

114133
let mut rest = None;
115134
let mut used = 0;
@@ -128,10 +147,14 @@ impl<R: Read + Write + Unpin> ImapStream<R> {
128147
Ok(response)
129148
}
130149
Err(nom::Err::Incomplete(Needed::Size(min))) => {
150+
log::trace!("decode: incomplete data, need minimum {} bytes", min);
131151
self.decode_needs = min;
132152
Err(None)
133153
}
134-
Err(nom::Err::Incomplete(_)) => Err(None),
154+
Err(nom::Err::Incomplete(_)) => {
155+
log::trace!("decode: incomplete data, need unknown number of bytes");
156+
Err(None)
157+
}
135158
Err(err) => Err(Some(io::Error::new(
136159
io::ErrorKind::Other,
137160
format!("{:?} during parsing of {:?}", err, &buf[start..end]),
@@ -157,22 +180,26 @@ impl<R: Read + Write + Unpin> Stream for ImapStream<R> {
157180
type Item = io::Result<ResponseData>;
158181

159182
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
183+
// The `poll_next` method must strive to be as idempotent as possible if the underlying
184+
// future/stream is not yet ready to produce results. It means that we must be careful
185+
// to persist the state of polling between calls to `poll_next`, specifically,
186+
// we must always restore the buffer and the current position before any `return`s.
187+
160188
let this = &mut *self;
161189

162-
let mut n = this.current;
163-
this.current = (0, 0);
190+
let mut n = std::mem::replace(&mut this.current, Position::ZERO);
164191
let buffer = std::mem::replace(&mut this.buffer, POOL.alloc(INITIAL_CAPACITY));
165192

166-
let mut buffer = if (n.1 - n.0) > 0 {
167-
match this.decode(buffer, n.0, n.1)? {
193+
let mut buffer = if (n.end - n.start) > 0 && this.initial_decode {
194+
match this.decode(buffer, n.start, n.end)? {
168195
DecodeResult::Some {
169196
response,
170197
buffer,
171198
used,
172199
} => {
173-
this.current = (0, used);
200+
// initial_decode is still true
174201
std::mem::replace(&mut this.buffer, buffer);
175-
202+
this.current = Position::new(0, used);
176203
return Poll::Ready(Some(Ok(response)));
177204
}
178205
DecodeResult::None(buffer) => buffer,
@@ -182,44 +209,63 @@ impl<R: Read + Write + Unpin> Stream for ImapStream<R> {
182209
};
183210

184211
loop {
185-
if (n.1 - n.0) + this.decode_needs >= buffer.capacity() {
212+
if (n.end - n.start) + this.decode_needs >= buffer.capacity() {
186213
if buffer.capacity() + this.decode_needs < MAX_CAPACITY {
187214
buffer.realloc(buffer.capacity() + this.decode_needs);
188215
} else {
216+
std::mem::replace(&mut this.buffer, buffer);
217+
this.current = n;
189218
return Poll::Ready(Some(Err(io::Error::new(
190219
io::ErrorKind::Other,
191220
"incoming data too large",
192221
))));
193222
}
194223
}
195224

196-
n.1 += futures::ready!(Pin::new(&mut this.inner).poll_read(cx, &mut buffer[n.1..]))?;
225+
let bytes_read = match Pin::new(&mut this.inner).poll_read(cx, &mut buffer[n.end..]) {
226+
Poll::Ready(result) => result?,
227+
Poll::Pending => {
228+
// if we're here, it means that we need more data but there is none yet,
229+
// so no decoding attempts are necessary until we get more data
230+
this.initial_decode = false;
197231

198-
match this.decode(buffer, n.0, n.1)? {
232+
std::mem::replace(&mut this.buffer, buffer);
233+
this.current = n;
234+
return Poll::Pending;
235+
}
236+
};
237+
n.end += bytes_read;
238+
239+
match this.decode(buffer, n.start, n.end)? {
199240
DecodeResult::Some {
200241
response,
201242
buffer,
202243
used,
203244
} => {
204-
this.current = (0, used);
205-
std::mem::replace(&mut this.buffer, buffer);
245+
// current buffer might now contain more data inside, so we need to attempt
246+
// to decode it next time
247+
this.initial_decode = true;
206248

249+
std::mem::replace(&mut this.buffer, buffer);
250+
this.current = Position::new(0, used);
207251
return Poll::Ready(Some(Ok(response)));
208252
}
209253
DecodeResult::None(buf) => {
210254
buffer = buf;
211255

212-
if this.buffer.is_empty() || (n.0 == 0 && n.1 == 0) {
213-
// put back the buffer to reuse it
256+
if this.buffer.is_empty() || n == Position::ZERO {
257+
// "logical buffer" is empty, there is nothing to decode on the next step
258+
this.initial_decode = false;
259+
214260
std::mem::replace(&mut this.buffer, buffer);
215261
this.current = n;
216-
217262
return Poll::Ready(None);
218-
} else if (n.1 - n.0) == 0 {
219-
// put back the buffer to reuse it
263+
} else if (n.end - n.start) == 0 {
264+
// "logical buffer" is empty, there is nothing to decode on the next step
265+
this.initial_decode = false;
266+
220267
std::mem::replace(&mut this.buffer, buffer);
221268
this.current = n;
222-
223269
return Poll::Ready(Some(Err(io::Error::new(
224270
io::ErrorKind::UnexpectedEof,
225271
"bytes remaining in stream",

0 commit comments

Comments
 (0)