Skip to content
This repository was archived by the owner on Sep 5, 2019. It is now read-only.

Commit f496cea

Browse files
committed
rework error handling
1 parent a058073 commit f496cea

File tree

3 files changed

+112
-43
lines changed

3 files changed

+112
-43
lines changed

src/main.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,11 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> {
2626

2727
let (broker_sender, broker_receiver) = mpsc::unbounded();
2828
let broker = task::spawn(broker(broker_receiver));
29-
let mut readers = Vec::new();
3029
let mut incoming = listener.incoming();
3130
while let Some(stream) = incoming.next().await {
3231
let stream = stream?;
3332
println!("Accepting from: {}", stream.peer_addr()?);
34-
let handle = task::spawn(client(broker_sender.clone(), stream));
35-
readers.push(handle);
36-
}
37-
for reader in readers {
38-
reader.await?;
33+
spawn_and_log_error(client(broker_sender.clone(), stream));
3934
}
4035
broker.await?;
4136
Ok(())
@@ -96,7 +91,6 @@ enum Event {
9691

9792
async fn broker(mut events: Receiver<Event>) -> Result<()> {
9893
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
99-
let mut writers = Vec::new();
10094

10195
while let Some(event) = events.next().await {
10296
match event {
@@ -110,14 +104,20 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
110104
Event::NewPeer { name, stream} => {
111105
let (client_sender, client_receiver) = mpsc::unbounded();
112106
peers.insert(name.clone(), client_sender);
113-
let handle = task::spawn(client_writer(client_receiver, stream));
114-
writers.push(handle);
107+
spawn_and_log_error(client_writer(client_receiver, stream));
115108
}
116109
}
117110
}
118-
drop(peers);
119-
for writer in writers {
120-
writer.await?;
121-
}
122111
Ok(())
123112
}
113+
114+
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
115+
where
116+
F: Future<Output = Result<()>> + Send + 'static,
117+
{
118+
task::spawn(async move {
119+
if let Err(e) = fut.await {
120+
eprintln!("{}", e)
121+
}
122+
})
123+
}

tutorial.adoc

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,42 @@ async fn client(stream: TcpStream) -> Result<()> {
215215

216216
<5> Finally, we parse each line into a list of destination logins and the message itself.
217217

218+
== Managing Errors
219+
220+
One serious problem in the above solution is that, while we correctly propagate errors in the `client`, we just drop the error on the floor afterwards!
221+
That is, `task::spawn` does not return error immediately (it can't, it needs to run the future to completion first), only after it is joined.
222+
We can "fix" it by waiting for the task to be joined, like this:
223+
224+
[source,rust]
225+
----
226+
let handle = task::spawn(client(stream)); <1>
227+
handle.await?
228+
----
229+
230+
The `.await` waits until the client finishes, and `?` propagates the result.
231+
232+
There are two problems with this solution however!
233+
_First_, because we immediately await the client, we can only handle one client at time, and that completely defeats the purpose of async!
234+
_Second_, if a client encounters an IO error, the whole server immediately exits.
235+
That is, a flaky internet connection of one peer brings down the whole chat room!
236+
237+
A correct way to handle client errors in this case is log them, and continue serving other clients.
238+
So let's use a helper function for this:
239+
240+
[source,rust]
241+
----
242+
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
243+
where
244+
F: Future<Output = Result<()>> + Send + 'static,
245+
{
246+
task::spawn(async move {
247+
if let Err(e) = fut.await {
248+
eprintln!("{}", e)
249+
}
250+
})
251+
}
252+
----
253+
218254
== Sending Messages
219255

220256
Now it's time to implement the other half -- sending messages.
@@ -295,7 +331,7 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
295331
Event::NewPeer { name, stream} => {
296332
let (client_sender, client_receiver) = mpsc::unbounded();
297333
peers.insert(name.clone(), client_sender); <4>
298-
let _handle = task::spawn(client_writer(client_receiver, stream)); <5>
334+
spawn_and_log_error(client_writer(client_receiver, stream)); <5>
299335
}
300336
}
301337
}
@@ -348,7 +384,7 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> {
348384
while let Some(stream) = incoming.next().await {
349385
let stream = stream?;
350386
println!("Accepting from: {}", stream.peer_addr()?);
351-
let _handle = task::spawn(client(broker_sender.clone(), stream));
387+
spawn_and_log_error(client(broker_sender.clone(), stream));
352388
}
353389
Ok(())
354390
}
@@ -422,7 +458,7 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
422458
Event::NewPeer { name, stream} => {
423459
let (client_sender, client_receiver) = mpsc::unbounded();
424460
peers.insert(name.clone(), client_sender);
425-
let _handle = task::spawn(client_writer(client_receiver, stream));
461+
spawn_and_log_error(client_writer(client_receiver, stream));
426462
}
427463
}
428464
}
@@ -466,18 +502,13 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> {
466502
467503
let (broker_sender, broker_receiver) = mpsc::unbounded();
468504
let broker = task::spawn(broker(broker_receiver));
469-
let mut readers = Vec::new();
470505
let mut incoming = listener.incoming();
471506
while let Some(stream) = incoming.next().await {
472507
let stream = stream?;
473508
println!("Accepting from: {}", stream.peer_addr()?);
474-
let handle = task::spawn(client(broker_sender.clone(), stream));
475-
readers.push(handle);
476-
}
477-
drop(broker_sender);
478-
for reader in readers {
479-
reader.await?; <1>
509+
spawn_and_log_error(client(broker_sender.clone(), stream));
480510
}
511+
drop(broker_sender); <1>
481512
broker.await?; <5>
482513
Ok(())
483514
}
@@ -503,13 +534,13 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
503534
Event::NewPeer { name, stream} => {
504535
let (client_sender, client_receiver) = mpsc::unbounded();
505536
peers.insert(name.clone(), client_sender);
506-
let handle = task::spawn(client_writer(client_receiver, stream));
537+
let handle = spawn_and_log_error(client_writer(client_receiver, stream));
507538
writers.push(handle);
508539
}
509540
}
510541
}
511542
drop(peers); <3>
512-
for writer in writers {
543+
for writer in writers { <4>
513544
writer.await?;
514545
}
515546
Ok(())
@@ -518,10 +549,10 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
518549

519550
Notice what happens with all of the channels once we exit the accept loop:
520551

521-
<1> First, we drop the main broker's sender and join all of the readers.
522-
When the readers are done, there's no sender for the broker's channel.
552+
<1> First, we drop the main broker's sender.
553+
That way when the readers are done, there's no sender for the broker's channel, and the chanel closes.
523554
<2> Next, the broker exits `while let Some(event) = events.next().await` loop.
524555
<3> It's crucial that, at this stage, we drop the `peers` map.
525556
This drops writer's senders.
526557
<4> Now we can join all of the writers.
527-
<5> Finally, we join the broker, which also gurantess that all the writes have terminated.
558+
<5> Finally, we join the broker, which also guarantees that all the writes have terminated.

0 commit comments

Comments
 (0)