Skip to content
This repository was archived by the owner on Sep 5, 2019. It is now read-only.

Commit 9efea28

Browse files
committed
simple broker
1 parent 911d372 commit 9efea28

File tree

3 files changed

+555
-5
lines changed

3 files changed

+555
-5
lines changed

src/main.rs

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#![feature(async_await)]
22

3-
use std::{net::ToSocketAddrs, sync::Arc};
3+
use std::{net::ToSocketAddrs, sync::Arc, collections::HashMap};
4+
5+
use futures::channel::mpsc;
6+
use futures::SinkExt;
47

58
use async_std::{
69
io::BufReader,
@@ -10,31 +13,39 @@ use async_std::{
1013
};
1114

1215
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
16+
type Sender<T> = mpsc::UnboundedSender<T>;
17+
type Receiver<T> = mpsc::UnboundedReceiver<T>;
18+
1319

1420
fn main() -> Result<()> {
1521
task::block_on(server("127.0.0.1:8080"))
1622
}
1723

1824
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
1925
let listener = TcpListener::bind(addr).await?;
26+
27+
let (broker_sender, broker_receiver) = mpsc::unbounded();
28+
let broker = task::spawn(broker(broker_receiver));
2029
let mut incoming = listener.incoming();
2130
while let Some(stream) = incoming.next().await {
2231
let stream = stream?;
2332
println!("Accepting from: {}", stream.peer_addr()?);
24-
let _handle = task::spawn(client(stream));
33+
let _handle = task::spawn(client(broker_sender.clone(), stream));
2534
}
35+
broker.await?;
2636
Ok(())
2737
}
2838

29-
async fn client(stream: TcpStream) -> Result<()> {
30-
let reader = BufReader::new(&stream);
39+
async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
40+
let stream = Arc::new(stream);
41+
let reader = BufReader::new(&*stream);
3142
let mut lines = reader.lines();
3243

3344
let name = match lines.next().await {
3445
None => Err("peer disconnected immediately")?,
3546
Some(line) => line?,
3647
};
37-
println!("name = {}", name);
48+
broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await.unwrap();
3849

3950
while let Some(line) = lines.next().await {
4051
let line = line?;
@@ -44,6 +55,58 @@ async fn client(stream: TcpStream) -> Result<()> {
4455
};
4556
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
4657
let msg: String = msg.trim().to_string();
58+
59+
broker.send(Event::Message {
60+
from: name.clone(),
61+
to: dest,
62+
msg,
63+
}).await.unwrap();
64+
}
65+
Ok(())
66+
}
67+
68+
async fn client_writer(
69+
mut messages: Receiver<String>,
70+
stream: Arc<TcpStream>,
71+
) -> Result<()> {
72+
let mut stream = &*stream;
73+
while let Some(msg) = messages.next().await {
74+
stream.write_all(msg.as_bytes()).await?;
75+
}
76+
Ok(())
77+
}
78+
79+
#[derive(Debug)]
80+
enum Event {
81+
NewPeer {
82+
name: String,
83+
stream: Arc<TcpStream>,
84+
},
85+
Message {
86+
from: String,
87+
to: Vec<String>,
88+
msg: String,
89+
},
90+
}
91+
92+
async fn broker(mut events: Receiver<Event>) -> Result<()> {
93+
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
94+
95+
while let Some(event) = events.next().await {
96+
match event {
97+
Event::Message { from, to, msg } => {
98+
for addr in to {
99+
if let Some(peer) = peers.get_mut(&addr) {
100+
peer.send(format!("from {}: {}\n", from, msg)).await?
101+
}
102+
}
103+
}
104+
Event::NewPeer { name, stream} => {
105+
let (client_sender, client_receiver) = mpsc::unbounded();
106+
peers.insert(name.clone(), client_sender);
107+
let _handle = task::spawn(client_writer(client_receiver, stream));
108+
}
109+
}
47110
}
48111
Ok(())
49112
}

tutorial.adoc

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,218 @@ That way, a client can directly `.write_all` a message to recipients.
223223
However, this would be wrong: if Alice sends `bob: foo`, and Charley sends `bob: bar`, Bob might actually receive `fobaor`.
224224
Sending a message over a socket might require several syscalls, so two concurrent ``.write_all``'s might interfere with each other!
225225

226+
As a rule of thumb, only a single task should write to each `TcpStream`.
227+
So let's create a `client_writer` task which receives messages over a channel and writes them to the socket.
228+
This task would be the point of serialization of messages.
229+
if Alice and Charley send two messages to Bob at the same time, Bob will see the messages in the same order as they arrive in the channel.
226230

231+
[source,rust]
232+
----
233+
use futures::channel::mpsc; <1>
234+
use futures::SinkExt;
235+
236+
type Sender<T> = mpsc::UnboundedSender<T>; <2>
237+
type Receiver<T> = mpsc::UnboundedReceiver<T>;
238+
239+
async fn client_writer(
240+
mut messages: Receiver<String>,
241+
stream: Arc<TcpStream>, <3>
242+
) -> Result<()> {
243+
let mut stream = &*stream;
244+
while let Some(msg) = messages.next().await {
245+
stream.write_all(msg.as_bytes()).await?;
246+
}
247+
Ok(())
248+
}
249+
----
250+
251+
<1> We will use channels from the `futures` crate.
252+
<2> For simplicity, we will use `unbounded` channels, and won't be discussing backpressure in this tutorial.
253+
<3> As `client` and `client_writer` share the same `TcpStream`, we need to put it into an `Arc`.
254+
Note that because `client` only reads from and `client_writer` only writes to the stream, so we don't get a race here.
255+
256+
257+
== Connecting Readers and Writers
258+
259+
So how we make sure that messages read in `client` flow into the relevant `client_writer`?
260+
We should somehow maintain an `peers: HashMap<String, Sender<String>>` map which allows a client to find destination channels.
261+
However, this map would be a bit of shared mutable state, so we'll have to wrap an `RwLock` over it and answer tough questions of what should happen if the client joins at the same moment as it receives a message.
262+
263+
One trick to make reasoning about state simpler comes from the actor model.
264+
We can create a dedicated broker tasks which owns the `peers` map and communicates with other tasks by channels.
265+
By hiding `peers` inside such "actor" task, we remove the need for mutxes and also make serialization point explicit.
266+
The order of events "Bob sends message to Alice" and "Alice joins" is determined by the order of the corresponding events in the broker's event queue.
267+
268+
[source,rust]
269+
----
270+
#[derive(Debug)]
271+
enum Event { <1>
272+
NewPeer {
273+
name: String,
274+
stream: Arc<TcpStream>,
275+
},
276+
Message {
277+
from: String,
278+
to: Vec<String>,
279+
msg: String,
280+
},
281+
}
282+
283+
async fn broker(mut events: Receiver<Event>) -> Result<()> {
284+
let mut peers: HashMap<String, Sender<String>> = HashMap::new(); <2>
285+
286+
while let Some(event) = events.next().await {
287+
match event {
288+
Event::Message { from, to, msg } => { <3>
289+
for addr in to {
290+
if let Some(peer) = peers.get_mut(&addr) {
291+
peer.send(format!("from {}: {}\n", from, msg)).await?
292+
}
293+
}
294+
}
295+
Event::NewPeer { name, stream} => {
296+
let (client_sender, client_receiver) = mpsc::unbounded();
297+
peers.insert(name.clone(), client_sender); <4>
298+
let _handle = task::spawn(client_writer(client_receiver, stream)); <5>
299+
}
300+
}
301+
}
302+
Ok(())
303+
}
304+
----
305+
306+
<1> Broker should handle two types of events: a message or an arrival of a new peer.
307+
<2> Internal state of the broker is a `HashMap`.
308+
Note how we don't need a `Mutex` here and can confidently say, at each iteration of the broker's loop, what is the current set of peers
309+
<3> To handle a message we send it over a channel to each destination
310+
<4> To handle new peer, we first register it in the peer's map ...
311+
<4> ... and then spawn a dedicated task to actually write the messages to the socket.
312+
313+
== All Together
314+
315+
At this point, we only need to start broker to get a fully-functioning (in the happy case!) chat:
316+
317+
[source,rust]
318+
----
319+
#![feature(async_await)]
320+
321+
use std::{net::ToSocketAddrs, sync::Arc, collections::HashMap};
322+
323+
use futures::channel::mpsc;
324+
use futures::SinkExt;
325+
326+
use async_std::{
327+
io::BufReader,
328+
prelude::*,
329+
task,
330+
net::{TcpListener, TcpStream},
331+
};
332+
333+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
334+
type Sender<T> = mpsc::UnboundedSender<T>;
335+
type Receiver<T> = mpsc::UnboundedReceiver<T>;
336+
337+
338+
fn main() -> Result<()> {
339+
task::block_on(server("127.0.0.1:8080"))
340+
}
341+
342+
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
343+
let listener = TcpListener::bind(addr).await?;
344+
345+
let (broker_sender, broker_receiver) = mpsc::unbounded(); <1>
346+
let broker = task::spawn(broker(broker_receiver));
347+
let mut incoming = listener.incoming();
348+
while let Some(stream) = incoming.next().await {
349+
let stream = stream?;
350+
println!("Accepting from: {}", stream.peer_addr()?);
351+
let _handle = task::spawn(client(broker_sender.clone(), stream));
352+
}
353+
broker.await?; <2>
354+
Ok(())
355+
}
356+
357+
async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
358+
let stream = Arc::new(stream); <3>
359+
let reader = BufReader::new(&*stream);
360+
let mut lines = reader.lines();
361+
362+
let name = match lines.next().await {
363+
None => Err("peer disconnected immediately")?,
364+
Some(line) => line?,
365+
};
366+
broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await <4>
367+
.unwrap();
368+
369+
while let Some(line) = lines.next().await {
370+
let line = line?;
371+
let (dest, msg) = match line.find(':') {
372+
None => continue,
373+
Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
374+
};
375+
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
376+
let msg: String = msg.trim().to_string();
377+
378+
broker.send(Event::Message { <5>
379+
from: name.clone(),
380+
to: dest,
381+
msg,
382+
}).await.unwrap();
383+
}
384+
Ok(())
385+
}
386+
387+
async fn client_writer(
388+
mut messages: Receiver<String>,
389+
stream: Arc<TcpStream>,
390+
) -> Result<()> {
391+
let mut stream = &*stream;
392+
while let Some(msg) = messages.next().await {
393+
stream.write_all(msg.as_bytes()).await?;
394+
}
395+
Ok(())
396+
}
397+
398+
#[derive(Debug)]
399+
enum Event {
400+
NewPeer {
401+
name: String,
402+
stream: Arc<TcpStream>,
403+
},
404+
Message {
405+
from: String,
406+
to: Vec<String>,
407+
msg: String,
408+
},
409+
}
410+
411+
async fn broker(mut events: Receiver<Event>) -> Result<()> {
412+
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
413+
414+
while let Some(event) = events.next().await {
415+
match event {
416+
Event::Message { from, to, msg } => {
417+
for addr in to {
418+
if let Some(peer) = peers.get_mut(&addr) {
419+
peer.send(format!("from {}: {}\n", from, msg)).await?
420+
}
421+
}
422+
}
423+
Event::NewPeer { name, stream} => {
424+
let (client_sender, client_receiver) = mpsc::unbounded();
425+
peers.insert(name.clone(), client_sender);
426+
let _handle = task::spawn(client_writer(client_receiver, stream));
427+
}
428+
}
429+
}
430+
Ok(())
431+
}
432+
----
433+
434+
<1> Inside the `server`, we create broker's channel and `task`.
435+
<2> After ``server``'s end, we join broker to make sure all pending messages are delivered.
436+
Note that this doesn't quite do the trick yet, as we are not joining readers and writers themselves.
437+
<3> Inside `client`, we need to wrap `TcpStream` into an `Arc`, to be able to share it with the `client_writer`.
438+
<4> On login, we notify the broker.
439+
Note that we `.unwrap` on send: broker should outlive all the clients and if that's not the case the broker probably panicked, so we can escalate the panic as well.
440+
<5> Similarly, we forward parsed messages to the broker, assuming that it is alive.

0 commit comments

Comments
 (0)