Skip to content
This repository was archived by the owner on Sep 5, 2019. It is now read-only.

Commit 911d372

Browse files
committed
start tutorial
1 parent 381c893 commit 911d372

File tree

5 files changed

+1103
-2
lines changed

5 files changed

+1103
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/bin/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
use std::{collections::HashMap, net::ToSocketAddrs};
44

55
use futures::{
6-
io::{BufReader, WriteHalf},
7-
stream::FuturesUnordered,
6+
io::{BufReader, WriteHalf, AsyncRead},
7+
stream::{FuturesUnordered, Stream},
88
channel::mpsc::{self, unbounded},
99
SinkExt,
1010
select,

src/main.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#![feature(async_await)]
2+
3+
use std::{net::ToSocketAddrs, sync::Arc};
4+
5+
use async_std::{
6+
io::BufReader,
7+
prelude::*,
8+
task,
9+
net::{TcpListener, TcpStream},
10+
};
11+
12+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
13+
14+
fn main() -> Result<()> {
15+
task::block_on(server("127.0.0.1:8080"))
16+
}
17+
18+
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
19+
let listener = TcpListener::bind(addr).await?;
20+
let mut incoming = listener.incoming();
21+
while let Some(stream) = incoming.next().await {
22+
let stream = stream?;
23+
println!("Accepting from: {}", stream.peer_addr()?);
24+
let _handle = task::spawn(client(stream));
25+
}
26+
Ok(())
27+
}
28+
29+
async fn client(stream: TcpStream) -> Result<()> {
30+
let reader = BufReader::new(&stream);
31+
let mut lines = reader.lines();
32+
33+
let name = match lines.next().await {
34+
None => Err("peer disconnected immediately")?,
35+
Some(line) => line?,
36+
};
37+
println!("name = {}", name);
38+
39+
while let Some(line) = lines.next().await {
40+
let line = line?;
41+
let (dest, msg) = match line.find(':') {
42+
None => continue,
43+
Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
44+
};
45+
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
46+
let msg: String = msg.trim().to_string();
47+
}
48+
Ok(())
49+
}

tutorial.adoc

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
= a-chat tutorial
2+
:icons: font
3+
:source-highlighter: pygments
4+
:pygments-style: borland
5+
6+
:source-language: rust
7+
8+
In this tutorial, we will implement an asynchronous chat on top of async-std.
9+
10+
== Specification
11+
12+
The chat uses a simple text protocol over TCP.
13+
Protocol consists of utf-8 messages, separated by `\n`.
14+
15+
The client connects to the server and sends login as a first line.
16+
After that, the client can send messages to other clients using the following syntax:
17+
18+
[source]
19+
----
20+
login1, login2, ... login2: message
21+
----
22+
23+
Each of the specified clients than receives a `from login: message` message.
24+
25+
A possible session might look like this
26+
27+
[cols="2",frame=none,grid=none]
28+
|===
29+
a|
30+
.alice
31+
----
32+
> alice
33+
> bob: hello
34+
35+
36+
< from bob: hi!
37+
----
38+
39+
a|
40+
.bob
41+
----
42+
> bob
43+
44+
< from alice: hello
45+
> alice, bob: hi!
46+
< from bob: hi!
47+
----
48+
49+
|===
50+
51+
The main challenge for the chat server is keeping track of many concurrent connections.
52+
The main challenge for the chat client is managing concurrent outgoing messages, incoming messages and user's typing.
53+
54+
== Getting Started
55+
56+
Let's create a new Cargo project:
57+
58+
[source]
59+
----
60+
$ cargo new a-chat
61+
$ cd a-chat
62+
----
63+
64+
At the moment `async-std` requires nightly, so let's add a rustup override for convenience:
65+
66+
[source]
67+
----
68+
$ rustup override add nightly
69+
$ rustc --version
70+
rustc 1.38.0-nightly (c4715198b 2019-08-05)
71+
----
72+
73+
== Accept Loop
74+
75+
Let's implement the scaffold of the server: a loop that binds a TCP socket to an address and starts accepting connections.
76+
77+
78+
First of all, let's add required import boilerplate:
79+
80+
[source,rust]
81+
----
82+
#![feature(async_await)]
83+
84+
use std::net::ToSocketAddrs; <1>
85+
86+
use async_std::{
87+
prelude::*, <2>
88+
task, <3>
89+
net::TcpListener, <4>
90+
};
91+
92+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; <5>
93+
----
94+
95+
<1> `async_std` uses `std` types where appropriate.
96+
We'll need `ToSocketAddrs` to specify address to listen on.
97+
<2> `prelude` re-exports some traits required to work with futures and streams
98+
<3> The `task` module roughtly corresponds to `std::thread` module, but tasks are much lighter weight.
99+
A single thread can run many tasks.
100+
<4> For the socket type, we use `TcpListener` from `async_std`, which is just like `std::net::TcpListener`, but is non-blocking and uses `async` API.
101+
<5> We will skip implementing comprehensive error handling in this example.
102+
To propagate the errors, we will use a boxed error trait object.
103+
+
104+
NOTE: Do you know that there's `From<&'_ str> for Box<dyn Error>` implementation in
105+
stdlib, which allows you to use strings with `?` operator?
106+
107+
108+
Now we can write the server's accept loop:
109+
110+
[source,rust]
111+
----
112+
async fn server(addr: impl ToSocketAddrs) -> Result<()> { <1>
113+
let listener = TcpListener::bind(addr).await?; <2>
114+
let mut incoming = listener.incoming();
115+
while let Some(stream) = incoming.next().await { <3>
116+
// TODO
117+
}
118+
Ok(())
119+
}
120+
----
121+
122+
<1> We mark `server` function as `async`, which allows us to use `.await` syntax inside.
123+
<2> `TcpListener::bind` call returns a future, which we `.await` to extract the `Result`, and then `?` to get a `TcpListener`.
124+
Note how `.await` and `?` work nicely together.
125+
This is exactly how `std::net::TcpListener` works, but with `.await` added.
126+
Mirroring API of `std` is an explicit design goal of `async_std`.
127+
<3> Here, we would like to iterate incoming sockets, just how one would do in `std`:
128+
+
129+
[source,rust]
130+
----
131+
let listener: std::net::TcpListener = unimplemented!();
132+
for stream in listener.incoming() {
133+
134+
}
135+
----
136+
+
137+
Unfortunately this doesn't quite work with `async` yet, because there's no support for `async` for-loops in the language yet.
138+
For this reason we have to implement the loop manually, by using `while let Some(item) = iter.next().await` pattern.
139+
140+
Finally, let's add main:
141+
142+
[source,rust]
143+
----
144+
fn main() -> Result<()> {
145+
let fut = server("127.0.0.1:8080");
146+
task::block_on(fut)
147+
}
148+
----
149+
150+
The crucial thing to realise that is in Rust, unlike other languages, calling an async function does **not** run any code.
151+
Async functions only construct futures, which are inert state machines.
152+
To start stepping through the future state-machine in an async function, you should use `.await`.
153+
In a non-async function, a way to execute a future is to handle it to the executor.
154+
In this case, we use `task::block_on` to execute future on the current thread and block until it's done.
155+
156+
== Receiving messages
157+
158+
Let's implement the receiving part of the protocol.
159+
We need to:
160+
161+
. split incoming `TcpStream` on `\n` and decode bytes as utf-8
162+
. interpret the first line as a login
163+
. parse the rest of the lines as a `login: message`
164+
165+
166+
[source]
167+
----
168+
use async_std::net::TcpStream;
169+
170+
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
171+
let listener = TcpListener::bind(addr).await?;
172+
let mut incoming = listener.incoming();
173+
while let Some(stream) = incoming.next().await {
174+
let stream = stream?;
175+
println!("Accepting from: {}", stream.peer_addr()?);
176+
let _handle = task::spawn(client(stream)); <1>
177+
}
178+
Ok(())
179+
}
180+
181+
async fn client(stream: TcpStream) -> Result<()> {
182+
let reader = BufReader::new(&stream); <2>
183+
let mut lines = reader.lines();
184+
185+
let name = match lines.next().await { <3>
186+
None => Err("peer disconnected immediately")?,
187+
Some(line) => line?,
188+
};
189+
println!("name = {}", name);
190+
191+
while let Some(line) = lines.next().await { <4>
192+
let line = line?;
193+
let (dest, msg) = match line.find(':') { <5>
194+
None => continue,
195+
Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
196+
};
197+
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
198+
let msg: String = msg.trim().to_string();
199+
}
200+
Ok(())
201+
}
202+
----
203+
204+
<1> We use `task::spawn` function to spawn an independent task for working with each client.
205+
That is, after accepting the client the `server` loop immediately starts waiting for the next one.
206+
This is the core benefit of event-driven architecture: we serve many number of clients concurrently, without spending many hardware threads.
207+
208+
<2> Luckily, the "split byte stream into lines" functionality is already implemented.
209+
`.lines()` call returns a stream of ``String``'s.
210+
TODO: show how one would implement `lines` by hand?
211+
212+
<3> We get the first line -- login
213+
214+
<4> And, once again, we implement a manual async for loop.
215+
216+
<5> Finally, we parse each line into a list of destination logins and the message itself.
217+
218+
== Sending Messages
219+
220+
Now it's time to implement the other half -- sending messages.
221+
A most obvious way to implement sending is to give each `client` access to the write half of `TcpStream` of each other clients.
222+
That way, a client can directly `.write_all` a message to recipients.
223+
However, this would be wrong: if Alice sends `bob: foo`, and Charley sends `bob: bar`, Bob might actually receive `fobaor`.
224+
Sending a message over a socket might require several syscalls, so two concurrent ``.write_all``'s might interfere with each other!
225+
226+

0 commit comments

Comments
 (0)