Skip to content
This repository was archived by the owner on Sep 5, 2019. It is now read-only.

Commit 116a641

Browse files
committed
update server
1 parent 3215540 commit 116a641

File tree

2 files changed

+105
-243
lines changed

2 files changed

+105
-243
lines changed

src/bin/server.rs

Lines changed: 105 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
#![feature(async_await)]
22

3-
use std::{collections::HashMap, net::ToSocketAddrs};
3+
use std::{
4+
net::ToSocketAddrs,
5+
sync::Arc,
6+
collections::hash_map::{HashMap, Entry},
7+
};
48

59
use futures::{
6-
io::{BufReader, WriteHalf, AsyncRead},
7-
stream::{FuturesUnordered, Stream},
8-
channel::mpsc::{self, unbounded},
10+
channel::mpsc,
911
SinkExt,
1012
select,
1113
};
1214

1315
use async_std::{
16+
io::BufReader,
1417
prelude::*,
1518
task,
1619
net::{TcpListener, TcpStream},
@@ -20,119 +23,156 @@ type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>
2023
type Sender<T> = mpsc::UnboundedSender<T>;
2124
type Receiver<T> = mpsc::UnboundedReceiver<T>;
2225

26+
#[derive(Debug)]
27+
enum Void {}
28+
2329
fn main() -> Result<()> {
2430
task::block_on(server("127.0.0.1:8080"))
2531
}
2632

27-
#[derive(Debug)]
28-
enum Event {
29-
NewPeer {
30-
name: String,
31-
stream: WriteHalf<TcpStream>,
32-
},
33-
Message {
34-
from: String,
35-
to: Vec<String>,
36-
msg: String,
37-
},
38-
}
39-
4033
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
4134
let listener = TcpListener::bind(addr).await?;
4235

43-
let (broker_sender, broker_receiver) = unbounded::<Event>();
36+
let (broker_sender, broker_receiver) = mpsc::unbounded();
4437
let broker = task::spawn(broker(broker_receiver));
45-
let mut incoming = listener.incoming().fuse();
46-
let mut tasks = FuturesUnordered::new();
47-
loop {
48-
let stream = select! {
49-
stream = incoming.next() => stream,
50-
res = tasks.next() => {
51-
if let Some(res) = res {
52-
res?
53-
}
54-
continue;
55-
},
56-
complete => break,
57-
};
58-
59-
let stream = stream.unwrap()?;
38+
let mut incoming = listener.incoming();
39+
while let Some(stream) = incoming.next().await {
40+
let stream = stream?;
6041
println!("Accepting from: {}", stream.peer_addr()?);
61-
62-
let handle = task::spawn(client(broker_sender.clone(), stream));
63-
tasks.push(handle);
42+
spawn_and_log_error(client(broker_sender.clone(), stream));
6443
}
65-
broker.await?;
44+
drop(broker_sender);
45+
broker.await;
6646
Ok(())
6747
}
6848

6949
async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
70-
let (reader, writer) = stream.split();
71-
let reader = BufReader::new(reader);
50+
let stream = Arc::new(stream);
51+
let reader = BufReader::new(&*stream);
7252
let mut lines = reader.lines();
53+
7354
let name = match lines.next().await {
7455
None => Err("peer disconnected immediately")?,
7556
Some(line) => line?,
7657
};
77-
broker.send(Event::NewPeer { name: name.clone(), stream: writer }).await.unwrap();
58+
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
59+
broker.send(Event::NewPeer {
60+
name: name.clone(),
61+
stream: Arc::clone(&stream),
62+
shutdown: shutdown_receiver,
63+
}).await.unwrap();
7864

7965
while let Some(line) = lines.next().await {
8066
let line = line?;
8167
let (dest, msg) = match line.find(':') {
8268
None => continue,
8369
Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
8470
};
71+
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
72+
let msg: String = msg.trim().to_string();
73+
8574
broker.send(Event::Message {
8675
from: name.clone(),
87-
to: dest.split(',').map(|name| name.trim().to_string()).collect(),
88-
msg: msg.trim().to_string(),
76+
to: dest,
77+
msg,
8978
}).await.unwrap();
9079
}
80+
9181
Ok(())
9282
}
9383

9484
async fn client_writer(
95-
mut receiver: Receiver<String>,
96-
mut writer: WriteHalf<TcpStream>,
85+
messages: &mut Receiver<String>,
86+
stream: Arc<TcpStream>,
87+
mut shutdown: Receiver<Void>,
9788
) -> Result<()> {
98-
while let Some(msg) = receiver.next().await {
99-
writer.write_all(msg.as_bytes()).await?;
89+
let mut stream = &*stream;
90+
loop {
91+
select! {
92+
msg = messages.next() => match msg {
93+
Some(msg) => stream.write_all(msg.as_bytes()).await?,
94+
None => break,
95+
},
96+
void = shutdown.next() => match void {
97+
Some(void) => match void {},
98+
None => break,
99+
}
100+
}
100101
}
101102
Ok(())
102103
}
103104

104-
async fn broker(mut events: Receiver<Event>) -> Result<()> {
105+
#[derive(Debug)]
106+
enum Event {
107+
NewPeer {
108+
name: String,
109+
stream: Arc<TcpStream>,
110+
shutdown: Receiver<Void>,
111+
},
112+
Message {
113+
from: String,
114+
to: Vec<String>,
115+
msg: String,
116+
},
117+
}
118+
119+
async fn broker(mut events: Receiver<Event>) {
120+
let (disconnect_sender, mut disconnect_receiver) =
121+
mpsc::unbounded::<(String, Receiver<String>)>();
105122
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
106-
let mut tasks = FuturesUnordered::new();
107123

108124
loop {
109125
let event = select! {
110-
event = events.next() => event,
111-
res = tasks.next() => {
112-
if let Some(res) = res {
113-
res?;
114-
}
126+
event = events.next() => match event {
127+
None => break,
128+
Some(event) => event,
129+
},
130+
disconnect = disconnect_receiver.next() => {
131+
let (name, _pending_messages) = disconnect.unwrap();
132+
assert!(peers.remove(&name).is_some());
115133
continue;
116-
}
117-
complete => break,
134+
},
118135
};
119-
120136
match event {
121-
Some(Event::Message { from, to, msg } )=> {
137+
Event::Message { from, to, msg } => {
122138
for addr in to {
123139
if let Some(peer) = peers.get_mut(&addr) {
124-
peer.send(format!("from {}: {}\n", from, msg)).await?
140+
peer.send(format!("from {}: {}\n", from, msg)).await
141+
.unwrap()
125142
}
126143
}
127144
}
128-
Some(Event::NewPeer { name, stream}) => {
129-
let (client_sender, client_receiver) = unbounded();
130-
peers.insert(name.clone(), client_sender);
131-
let handle = task::spawn(client_writer(client_receiver, stream));
132-
tasks.push(handle)
145+
Event::NewPeer { name, stream, shutdown } => {
146+
match peers.entry(name.clone()) {
147+
Entry::Occupied(..) => (),
148+
Entry::Vacant(entry) => {
149+
let (client_sender, mut client_receiver) = mpsc::unbounded();
150+
entry.insert(client_sender);
151+
let mut disconnect_sender = disconnect_sender.clone();
152+
spawn_and_log_error(async move {
153+
let res = client_writer(&mut client_receiver, stream, shutdown).await;
154+
disconnect_sender.send((name, client_receiver)).await
155+
.unwrap();
156+
res
157+
});
158+
}
159+
}
133160
}
134-
None => continue,
135161
}
136162
}
137-
Ok(())
163+
drop(peers);
164+
drop(disconnect_sender);
165+
while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {
166+
}
167+
}
168+
169+
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
170+
where
171+
F: Future<Output = Result<()>> + Send + 'static,
172+
{
173+
task::spawn(async move {
174+
if let Err(e) = fut.await {
175+
eprintln!("{}", e)
176+
}
177+
})
138178
}

0 commit comments

Comments
 (0)