Skip to content
This repository was archived by the owner on Sep 5, 2019. It is now read-only.

Commit a058073

Browse files
committed
graceful shutdown
1 parent 9efea28 commit a058073

File tree

3 files changed

+246
-23
lines changed

3 files changed

+246
-23
lines changed

src/main.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,16 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> {
2626

2727
let (broker_sender, broker_receiver) = mpsc::unbounded();
2828
let broker = task::spawn(broker(broker_receiver));
29+
let mut readers = Vec::new();
2930
let mut incoming = listener.incoming();
3031
while let Some(stream) = incoming.next().await {
3132
let stream = stream?;
3233
println!("Accepting from: {}", stream.peer_addr()?);
33-
let _handle = task::spawn(client(broker_sender.clone(), stream));
34+
let handle = task::spawn(client(broker_sender.clone(), stream));
35+
readers.push(handle);
36+
}
37+
for reader in readers {
38+
reader.await?;
3439
}
3540
broker.await?;
3641
Ok(())
@@ -91,6 +96,7 @@ enum Event {
9196

9297
async fn broker(mut events: Receiver<Event>) -> Result<()> {
9398
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
99+
let mut writers = Vec::new();
94100

95101
while let Some(event) = events.next().await {
96102
match event {
@@ -104,9 +110,14 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
104110
Event::NewPeer { name, stream} => {
105111
let (client_sender, client_receiver) = mpsc::unbounded();
106112
peers.insert(name.clone(), client_sender);
107-
let _handle = task::spawn(client_writer(client_receiver, stream));
113+
let handle = task::spawn(client_writer(client_receiver, stream));
114+
writers.push(handle);
108115
}
109116
}
110117
}
118+
drop(peers);
119+
for writer in writers {
120+
writer.await?;
121+
}
111122
Ok(())
112123
}

tutorial.adoc

Lines changed: 97 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -343,27 +343,26 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> {
343343
let listener = TcpListener::bind(addr).await?;
344344
345345
let (broker_sender, broker_receiver) = mpsc::unbounded(); <1>
346-
let broker = task::spawn(broker(broker_receiver));
346+
let _broker_handle = task::spawn(broker(broker_receiver));
347347
let mut incoming = listener.incoming();
348348
while let Some(stream) = incoming.next().await {
349349
let stream = stream?;
350350
println!("Accepting from: {}", stream.peer_addr()?);
351351
let _handle = task::spawn(client(broker_sender.clone(), stream));
352352
}
353-
broker.await?; <2>
354353
Ok(())
355354
}
356355
357356
async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
358-
let stream = Arc::new(stream); <3>
357+
let stream = Arc::new(stream); <2>
359358
let reader = BufReader::new(&*stream);
360359
let mut lines = reader.lines();
361360
362361
let name = match lines.next().await {
363362
None => Err("peer disconnected immediately")?,
364363
Some(line) => line?,
365364
};
366-
broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await <4>
365+
broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await <3>
367366
.unwrap();
368367
369368
while let Some(line) = lines.next().await {
@@ -375,7 +374,7 @@ async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
375374
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
376375
let msg: String = msg.trim().to_string();
377376
378-
broker.send(Event::Message { <5>
377+
broker.send(Event::Message { <4>
379378
from: name.clone(),
380379
to: dest,
381380
msg,
@@ -432,9 +431,97 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
432431
----
433432

434433
<1> Inside the `server`, we create broker's channel and `task`.
435-
<2> After ``server``'s end, we join broker to make sure all pending messages are delivered.
436-
Note that this doesn't quite do the trick yet, as we are not joining readers and writers themselves.
437-
<3> Inside `client`, we need to wrap `TcpStream` into an `Arc`, to be able to share it with the `client_writer`.
438-
<4> On login, we notify the broker.
434+
<2> Inside `client`, we need to wrap `TcpStream` into an `Arc`, to be able to share it with the `client_writer`.
435+
<3> On login, we notify the broker.
439436
Note that we `.unwrap` on send: broker should outlive all the clients and if that's not the case the broker probably panicked, so we can escalate the panic as well.
440-
<5> Similarly, we forward parsed messages to the broker, assuming that it is alive.
437+
<4> Similarly, we forward parsed messages to the broker, assuming that it is alive.
438+
439+
== Clean Shutdown
440+
441+
On of the problems of the current implementation is that it doesn't handle graceful shutdown.
442+
If we break from the accept loop for some reason, all in-flight tasks are just dropped on the floor.
443+
A more correct shutdown sequence would be:
444+
445+
. Stop accepting new clients
446+
. Deliver all pending messages
447+
. Exit the process
448+
449+
A clean shutdown in a channel based architecture is easy, although it can appear a magic trick at first.
450+
In Rust, receiver side of a channel is closed as soon as all senders are dropped.
451+
That is, as soon as producers exit and drop their senders, the rest of the system shutdowns naturally.
452+
In `async_std` this translates to two rules:
453+
454+
. Make sure that channels form an acyclic graph.
455+
. Take care to wait, in the correct order, until intermediate layers of the system process pending messages.
456+
457+
In `a-chat`, we already have an unidirectional flow of messages: `reader -> broker -> writer`.
458+
However, we never wait for broker and writers, which might cause some messages to get dropped.
459+
Let's add waiting to the server:
460+
461+
462+
[source,rust]
463+
----
464+
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
465+
let listener = TcpListener::bind(addr).await?;
466+
467+
let (broker_sender, broker_receiver) = mpsc::unbounded();
468+
let broker = task::spawn(broker(broker_receiver));
469+
let mut readers = Vec::new();
470+
let mut incoming = listener.incoming();
471+
while let Some(stream) = incoming.next().await {
472+
let stream = stream?;
473+
println!("Accepting from: {}", stream.peer_addr()?);
474+
let handle = task::spawn(client(broker_sender.clone(), stream));
475+
readers.push(handle);
476+
}
477+
drop(broker_sender);
478+
for reader in readers {
479+
reader.await?; <1>
480+
}
481+
broker.await?; <5>
482+
Ok(())
483+
}
484+
----
485+
486+
And to the broker:
487+
488+
[source,rust]
489+
----
490+
async fn broker(mut events: Receiver<Event>) -> Result<()> {
491+
let mut writers = Vec::new();
492+
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
493+
494+
while let Some(event) = events.next().await { <2>
495+
match event {
496+
Event::Message { from, to, msg } => {
497+
for addr in to {
498+
if let Some(peer) = peers.get_mut(&addr) {
499+
peer.send(format!("from {}: {}\n", from, msg)).await?
500+
}
501+
}
502+
}
503+
Event::NewPeer { name, stream} => {
504+
let (client_sender, client_receiver) = mpsc::unbounded();
505+
peers.insert(name.clone(), client_sender);
506+
let handle = task::spawn(client_writer(client_receiver, stream));
507+
writers.push(handle);
508+
}
509+
}
510+
}
511+
drop(peers); <3>
512+
for writer in writers {
513+
writer.await?;
514+
}
515+
Ok(())
516+
}
517+
----
518+
519+
Notice what happens with all of the channels once we exit the accept loop:
520+
521+
<1> First, we drop the main broker's sender and join all of the readers.
522+
When the readers are done, there's no sender for the broker's channel.
523+
<2> Next, the broker exits `while let Some(event) = events.next().await` loop.
524+
<3> It's crucial that, at this stage, we drop the `peers` map.
525+
This drops writer's senders.
526+
<4> Now we can join all of the writers.
527+
<5> Finally, we join the broker, which also gurantess that all the writes have terminated.

0 commit comments

Comments
 (0)