1
1
use std:: collections:: HashSet ;
2
2
3
+ use async_std:: channel;
3
4
use async_std:: io;
4
5
use async_std:: prelude:: * ;
5
6
use async_std:: stream:: Stream ;
6
- use async_std:: sync;
7
7
use imap_proto:: { self , MailboxDatum , RequestId , Response } ;
8
8
9
9
use crate :: error:: { Error , Result } ;
10
10
use crate :: types:: ResponseData ;
11
11
use crate :: types:: * ;
12
12
13
- pub ( crate ) fn parse_names < ' a , T : Stream < Item = io:: Result < ResponseData > > + Unpin + Send > (
14
- stream : & ' a mut T ,
15
- unsolicited : sync :: Sender < UnsolicitedResponse > ,
13
+ pub ( crate ) fn parse_names < T : Stream < Item = io:: Result < ResponseData > > + Unpin + Send > (
14
+ stream : & mut T ,
15
+ unsolicited : channel :: Sender < UnsolicitedResponse > ,
16
16
command_tag : RequestId ,
17
- ) -> impl Stream < Item = Result < Name > > + ' a + Send + Unpin {
17
+ ) -> impl Stream < Item = Result < Name > > + ' _ + Send + Unpin {
18
18
use futures:: { FutureExt , StreamExt } ;
19
19
20
20
StreamExt :: filter_map (
@@ -56,11 +56,11 @@ fn filter_sync(res: &io::Result<ResponseData>, command_tag: &RequestId) -> bool
56
56
}
57
57
}
58
58
59
- pub ( crate ) fn parse_fetches < ' a , T : Stream < Item = io:: Result < ResponseData > > + Unpin + Send > (
60
- stream : & ' a mut T ,
61
- unsolicited : sync :: Sender < UnsolicitedResponse > ,
59
+ pub ( crate ) fn parse_fetches < T : Stream < Item = io:: Result < ResponseData > > + Unpin + Send > (
60
+ stream : & mut T ,
61
+ unsolicited : channel :: Sender < UnsolicitedResponse > ,
62
62
command_tag : RequestId ,
63
- ) -> impl Stream < Item = Result < Fetch > > + ' a + Send + Unpin {
63
+ ) -> impl Stream < Item = Result < Fetch > > + ' _ + Send + Unpin {
64
64
use futures:: { FutureExt , StreamExt } ;
65
65
66
66
StreamExt :: filter_map (
@@ -85,11 +85,11 @@ pub(crate) fn parse_fetches<'a, T: Stream<Item = io::Result<ResponseData>> + Unp
85
85
)
86
86
}
87
87
88
- pub ( crate ) fn parse_expunge < ' a , T : Stream < Item = io:: Result < ResponseData > > + Unpin + Send > (
89
- stream : & ' a mut T ,
90
- unsolicited : sync :: Sender < UnsolicitedResponse > ,
88
+ pub ( crate ) fn parse_expunge < T : Stream < Item = io:: Result < ResponseData > > + Unpin + Send > (
89
+ stream : & mut T ,
90
+ unsolicited : channel :: Sender < UnsolicitedResponse > ,
91
91
command_tag : RequestId ,
92
- ) -> impl Stream < Item = Result < u32 > > + ' a + Send {
92
+ ) -> impl Stream < Item = Result < u32 > > + ' _ + Send {
93
93
use futures:: StreamExt ;
94
94
95
95
StreamExt :: filter_map (
@@ -113,9 +113,9 @@ pub(crate) fn parse_expunge<'a, T: Stream<Item = io::Result<ResponseData>> + Unp
113
113
)
114
114
}
115
115
116
- pub ( crate ) async fn parse_capabilities < ' a , T : Stream < Item = io:: Result < ResponseData > > + Unpin > (
117
- stream : & ' a mut T ,
118
- unsolicited : sync :: Sender < UnsolicitedResponse > ,
116
+ pub ( crate ) async fn parse_capabilities < T : Stream < Item = io:: Result < ResponseData > > + Unpin > (
117
+ stream : & mut T ,
118
+ unsolicited : channel :: Sender < UnsolicitedResponse > ,
119
119
command_tag : RequestId ,
120
120
) -> Result < Capabilities > {
121
121
let mut caps: HashSet < Capability > = HashSet :: new ( ) ;
@@ -143,7 +143,7 @@ pub(crate) async fn parse_capabilities<'a, T: Stream<Item = io::Result<ResponseD
143
143
144
144
pub ( crate ) async fn parse_noop < T : Stream < Item = io:: Result < ResponseData > > + Unpin > (
145
145
stream : & mut T ,
146
- unsolicited : sync :: Sender < UnsolicitedResponse > ,
146
+ unsolicited : channel :: Sender < UnsolicitedResponse > ,
147
147
command_tag : RequestId ,
148
148
) -> Result < ( ) > {
149
149
while let Some ( resp) = stream
@@ -160,7 +160,7 @@ pub(crate) async fn parse_noop<T: Stream<Item = io::Result<ResponseData>> + Unpi
160
160
161
161
pub ( crate ) async fn parse_mailbox < T : Stream < Item = io:: Result < ResponseData > > + Unpin > (
162
162
stream : & mut T ,
163
- unsolicited : sync :: Sender < UnsolicitedResponse > ,
163
+ unsolicited : channel :: Sender < UnsolicitedResponse > ,
164
164
command_tag : RequestId ,
165
165
) -> Result < Mailbox > {
166
166
let mut mailbox = Mailbox :: default ( ) ;
@@ -252,7 +252,7 @@ pub(crate) async fn parse_mailbox<T: Stream<Item = io::Result<ResponseData>> + U
252
252
253
253
pub ( crate ) async fn parse_ids < T : Stream < Item = io:: Result < ResponseData > > + Unpin > (
254
254
stream : & mut T ,
255
- unsolicited : sync :: Sender < UnsolicitedResponse > ,
255
+ unsolicited : channel :: Sender < UnsolicitedResponse > ,
256
256
command_tag : RequestId ,
257
257
) -> Result < HashSet < u32 > > {
258
258
let mut ids: HashSet < u32 > = HashSet :: new ( ) ;
@@ -282,7 +282,7 @@ pub(crate) async fn parse_ids<T: Stream<Item = io::Result<ResponseData>> + Unpin
282
282
// (see Section 7 of RFC 3501):
283
283
pub ( crate ) async fn handle_unilateral (
284
284
res : ResponseData ,
285
- unsolicited : sync :: Sender < UnsolicitedResponse > ,
285
+ unsolicited : channel :: Sender < UnsolicitedResponse > ,
286
286
) {
287
287
// ignore these if they are not being consumed
288
288
if unsolicited. is_full ( ) {
@@ -307,19 +307,32 @@ pub(crate) async fn handle_unilateral(
307
307
} )
308
308
. collect ( ) ,
309
309
} )
310
- . await ;
310
+ . await
311
+ . expect ( "Channel closed unexpectedly" ) ;
311
312
}
312
313
Response :: MailboxData ( MailboxDatum :: Recent ( n) ) => {
313
- unsolicited. send ( UnsolicitedResponse :: Recent ( * n) ) . await ;
314
+ unsolicited
315
+ . send ( UnsolicitedResponse :: Recent ( * n) )
316
+ . await
317
+ . expect ( "Channel closed unexpectedly" ) ;
314
318
}
315
319
Response :: MailboxData ( MailboxDatum :: Exists ( n) ) => {
316
- unsolicited. send ( UnsolicitedResponse :: Exists ( * n) ) . await ;
320
+ unsolicited
321
+ . send ( UnsolicitedResponse :: Exists ( * n) )
322
+ . await
323
+ . expect ( "Channel closed unexpectedly" ) ;
317
324
}
318
325
Response :: Expunge ( n) => {
319
- unsolicited. send ( UnsolicitedResponse :: Expunge ( * n) ) . await ;
326
+ unsolicited
327
+ . send ( UnsolicitedResponse :: Expunge ( * n) )
328
+ . await
329
+ . expect ( "Channel closed unexpectedly" ) ;
320
330
}
321
331
_ => {
322
- unsolicited. send ( UnsolicitedResponse :: Other ( res) ) . await ;
332
+ unsolicited
333
+ . send ( UnsolicitedResponse :: Other ( res) )
334
+ . await
335
+ . expect ( "Channel closed unexpectedly" ) ;
323
336
}
324
337
}
325
338
}
@@ -350,7 +363,7 @@ mod tests {
350
363
input_stream ( & [ "* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r \n " ] ) ;
351
364
352
365
let mut stream = async_std:: stream:: from_iter ( responses) ;
353
- let ( send, recv) = sync :: channel ( 10 ) ;
366
+ let ( send, recv) = channel :: bounded ( 10 ) ;
354
367
let id = RequestId ( "A0001" . into ( ) ) ;
355
368
let capabilities = parse_capabilities ( & mut stream, send, id) . await . unwrap ( ) ;
356
369
// shouldn't be any unexpected responses parsed
@@ -368,7 +381,7 @@ mod tests {
368
381
let responses = input_stream ( & [ "* CAPABILITY IMAP4REV1 STARTTLS\r \n " ] ) ;
369
382
let mut stream = async_std:: stream:: from_iter ( responses) ;
370
383
371
- let ( send, recv) = sync :: channel ( 10 ) ;
384
+ let ( send, recv) = channel :: bounded ( 10 ) ;
372
385
let id = RequestId ( "A0001" . into ( ) ) ;
373
386
let capabilities = parse_capabilities ( & mut stream, send, id) . await . unwrap ( ) ;
374
387
@@ -383,7 +396,7 @@ mod tests {
383
396
#[ async_std:: test]
384
397
#[ should_panic]
385
398
async fn parse_capability_invalid_test ( ) {
386
- let ( send, recv) = sync :: channel ( 10 ) ;
399
+ let ( send, recv) = channel :: bounded ( 10 ) ;
387
400
let responses = input_stream ( & [ "* JUNK IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r \n " ] ) ;
388
401
let mut stream = async_std:: stream:: from_iter ( responses) ;
389
402
@@ -396,7 +409,7 @@ mod tests {
396
409
397
410
#[ async_std:: test]
398
411
async fn parse_names_test ( ) {
399
- let ( send, recv) = sync :: channel ( 10 ) ;
412
+ let ( send, recv) = channel :: bounded ( 10 ) ;
400
413
let responses = input_stream ( & [ "* LIST (\\ HasNoChildren) \" .\" \" INBOX\" \r \n " ] ) ;
401
414
let mut stream = async_std:: stream:: from_iter ( responses) ;
402
415
@@ -417,7 +430,7 @@ mod tests {
417
430
418
431
#[ async_std:: test]
419
432
async fn parse_fetches_empty ( ) {
420
- let ( send, recv) = sync :: channel ( 10 ) ;
433
+ let ( send, recv) = channel :: bounded ( 10 ) ;
421
434
let responses = input_stream ( & [ ] ) ;
422
435
let mut stream = async_std:: stream:: from_iter ( responses) ;
423
436
let id = RequestId ( "a" . into ( ) ) ;
@@ -432,7 +445,7 @@ mod tests {
432
445
433
446
#[ async_std:: test]
434
447
async fn parse_fetches_test ( ) {
435
- let ( send, recv) = sync :: channel ( 10 ) ;
448
+ let ( send, recv) = channel :: bounded ( 10 ) ;
436
449
let responses = input_stream ( & [
437
450
"* 24 FETCH (FLAGS (\\ Seen) UID 4827943)\r \n " ,
438
451
"* 25 FETCH (FLAGS (\\ Seen))\r \n " ,
@@ -462,7 +475,7 @@ mod tests {
462
475
#[ async_std:: test]
463
476
async fn parse_fetches_w_unilateral ( ) {
464
477
// https://github.com/mattnenterprise/rust-imap/issues/81
465
- let ( send, recv) = sync :: channel ( 10 ) ;
478
+ let ( send, recv) = channel :: bounded ( 10 ) ;
466
479
let responses = input_stream ( & [ "* 37 FETCH (UID 74)\r \n " , "* 1 RECENT\r \n " ] ) ;
467
480
let mut stream = async_std:: stream:: from_iter ( responses) ;
468
481
let id = RequestId ( "a" . into ( ) ) ;
@@ -480,7 +493,7 @@ mod tests {
480
493
481
494
#[ async_std:: test]
482
495
async fn parse_names_w_unilateral ( ) {
483
- let ( send, recv) = sync :: channel ( 10 ) ;
496
+ let ( send, recv) = channel :: bounded ( 10 ) ;
484
497
let responses = input_stream ( & [
485
498
"* LIST (\\ HasNoChildren) \" .\" \" INBOX\" \r \n " ,
486
499
"* 4 EXPUNGE\r \n " ,
@@ -506,7 +519,7 @@ mod tests {
506
519
507
520
#[ async_std:: test]
508
521
async fn parse_capabilities_w_unilateral ( ) {
509
- let ( send, recv) = sync :: channel ( 10 ) ;
522
+ let ( send, recv) = channel :: bounded ( 10 ) ;
510
523
let responses = input_stream ( & [
511
524
"* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r \n " ,
512
525
"* STATUS dev.github (MESSAGES 10 UIDNEXT 11 UIDVALIDITY 1408806928 UNSEEN 0)\r \n " ,
@@ -541,7 +554,7 @@ mod tests {
541
554
542
555
#[ async_std:: test]
543
556
async fn parse_ids_w_unilateral ( ) {
544
- let ( send, recv) = sync :: channel ( 10 ) ;
557
+ let ( send, recv) = channel :: bounded ( 10 ) ;
545
558
let responses = input_stream ( & [
546
559
"* SEARCH 23 42 4711\r \n " ,
547
560
"* 1 RECENT\r \n " ,
@@ -571,7 +584,7 @@ mod tests {
571
584
572
585
#[ async_std:: test]
573
586
async fn parse_ids_test ( ) {
574
- let ( send, recv) = sync :: channel ( 10 ) ;
587
+ let ( send, recv) = channel :: bounded ( 10 ) ;
575
588
let responses = input_stream ( & [
576
589
"* SEARCH 1600 1698 1739 1781 1795 1885 1891 1892 1893 1898 1899 1901 1911 1926 1932 1933 1993 1994 2007 2032 2033 2041 2053 2062 2063 2065 2066 2072 2078 2079 2082 2084 2095 2100 2101 2102 2103 2104 2107 2116 2120 2135 2138 2154 2163 2168 2172 2189 2193 2198 2199 2205 2212 2213 2221 2227 2267 2275 2276 2295 2300 2328 2330 2332 2333 2334\r \n " ,
577
590
"* SEARCH 2335 2336 2337 2338 2339 2341 2342 2347 2349 2350 2358 2359 2362 2369 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2390 2392 2397 2400 2401 2403 2405 2409 2411 2414 2417 2419 2420 2424 2426 2428 2439 2454 2456 2467 2468 2469 2490 2515 2519 2520 2521\r \n " ,
@@ -604,7 +617,7 @@ mod tests {
604
617
605
618
#[ async_std:: test]
606
619
async fn parse_ids_search ( ) {
607
- let ( send, recv) = sync :: channel ( 10 ) ;
620
+ let ( send, recv) = channel :: bounded ( 10 ) ;
608
621
let responses = input_stream ( & [ "* SEARCH\r \n " ] ) ;
609
622
let mut stream = async_std:: stream:: from_iter ( responses) ;
610
623
0 commit comments