Skip to content

Commit db93ee7

Browse files
committed
samples: embassy: Update Embassy Demo for Zephyr executor
Update the embassy demo to use the Zephyr executor. This performs a ping-ping test using two executors, with one end of the responder running on another thread. On the rp2040, the round trip time with `executor-threaded` is about 12us, with `executor-zephyr`, all on a single thread, it is about 15us, and with pairs of task running across differrent threads, the time is about 26us. Signed-off-by: David Brown <[email protected]>
1 parent a995839 commit db93ee7

File tree

2 files changed

+288
-22
lines changed

2 files changed

+288
-22
lines changed

samples/embassy/Cargo.toml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@ crate-type = ["staticlib"]
1616
zephyr = { version = "0.1.0", features = ["time-driver"] }
1717
log = "0.4.22"
1818
static_cell = "2.1"
19+
heapless = "0.8"
1920

2021
[dependencies.embassy-executor]
2122
version = "0.7.0"
2223
# path = "../../embassy/embassy-executor"
2324
features = [
2425
"log",
2526
"task-arena-size-1024",
26-
"arch-cortex-m",
27-
"executor-thread",
2827
]
2928

3029
[dependencies.embassy-futures]
@@ -44,6 +43,19 @@ features = ["tick-hz-10_000"]
4443
[dependencies.critical-section]
4544
version = "1.2"
4645

46+
[features]
47+
# default = ["executor-thread"]
48+
default = ["executor-zephyr"]
49+
50+
executor-thread = [
51+
"embassy-executor/arch-cortex-m",
52+
"embassy-executor/executor-thread",
53+
]
54+
55+
executor-zephyr = [
56+
"zephyr/executor-zephyr",
57+
]
58+
4759
[profile.dev]
4860
opt-level = 1
4961

samples/embassy/src/lib.rs

Lines changed: 274 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,49 +3,303 @@
33

44
#![no_std]
55

6-
use embassy_executor::{Executor, Spawner};
6+
extern crate alloc;
7+
8+
use core::ffi::c_int;
9+
10+
#[cfg(feature = "executor-thread")]
11+
use embassy_executor::Executor;
12+
13+
#[cfg(feature = "executor-zephyr")]
14+
use zephyr::embassy::Executor;
15+
16+
use alloc::format;
17+
use embassy_executor::{SendSpawner, Spawner};
718
use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, channel::Channel};
8-
use embassy_time::{Duration, Timer};
919
use log::info;
1020
use static_cell::StaticCell;
21+
use zephyr::{kconfig::CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC, kobj_define, printkln, raw::k_cycle_get_64};
22+
use zephyr::raw;
23+
24+
/// Maximum number of threads to spawn. As this is async, these do not each need a stack.
25+
const NUM_THREADS: usize = 6;
26+
27+
const THREAD_STACK_SIZE: usize = 2048;
1128

1229
static EXECUTOR_LOW: StaticCell<Executor> = StaticCell::new();
30+
static EXECUTOR_MAIN: StaticCell<Executor> = StaticCell::new();
31+
32+
static LOW_SPAWNER: Channel<CriticalSectionRawMutex, SendSpawner, 1> = Channel::new() ;
33+
34+
// The main thread priority.
35+
const MAIN_PRIO: c_int = 2;
36+
const LOW_PRIO: c_int = 5;
1337

1438
#[no_mangle]
1539
extern "C" fn rust_main() {
1640
unsafe {
1741
zephyr::set_logger().unwrap();
1842
}
1943

20-
info!("Hello world from Rust on {}", zephyr::kconfig::CONFIG_BOARD);
44+
// Set our own priority.
45+
unsafe {
46+
raw::k_thread_priority_set(raw::k_current_get(), MAIN_PRIO);
47+
}
2148

22-
let executor = EXECUTOR_LOW.init(Executor::new());
49+
// Start up the low priority thread.
50+
let mut thread = LOW_THREAD
51+
.init_once(LOW_STACK.init_once(()).unwrap())
52+
.unwrap();
53+
thread.set_priority(LOW_PRIO);
54+
thread.spawn(move || {
55+
low_executor();
56+
});
57+
58+
info!("Starting Embassy executor on {}", zephyr::kconfig::CONFIG_BOARD);
59+
60+
let executor = EXECUTOR_MAIN.init(Executor::new());
2361
executor.run(|spawner| {
24-
spawner.spawn(sample_task(spawner)).unwrap();
62+
spawner.spawn(main(spawner)).unwrap();
2563
})
2664
}
2765

28-
static CHAN: Channel<CriticalSectionRawMutex, usize, 1> = Channel::new();
66+
/// The low priority executor.
67+
fn low_executor() -> ! {
68+
let executor = EXECUTOR_LOW.init(Executor::new());
69+
executor.run(|spawner| {
70+
LOW_SPAWNER.try_send(spawner.make_send()).ok().unwrap();
71+
})
72+
}
2973

3074
#[embassy_executor::task]
31-
async fn sample_task(spawner: Spawner) {
32-
info!("Started once");
33-
spawner.spawn(other_task(spawner)).unwrap();
34-
loop {
35-
// Wait for a message.
36-
let msg = CHAN.receive().await;
37-
info!("main task got: {}", msg);
75+
async fn main(spawner: Spawner) {
76+
info!("Benchmark begin");
77+
78+
let low_spawner = LOW_SPAWNER.receive().await;
79+
80+
let tester = ThreadTests::new(NUM_THREADS);
81+
82+
tester.run(spawner, low_spawner, Command::Empty).await;
83+
tester.run(spawner, low_spawner, Command::Empty).await;
84+
tester.run(spawner, low_spawner, Command::PingPong(10_000)).await;
85+
}
86+
87+
/// Async task tests.
88+
///
89+
/// For each test, we have a set of threads that do work, a "high priority" thread higher than those
90+
/// and a low priority thread, lower than any of those. This is used to test operations in both a
91+
/// fast-path (message or semaphore always available), and slow path (thread must block and be woken
92+
/// by message coming in). Generally, this is determined by whether high or low priority tasks are
93+
/// providing the data.
94+
struct ThreadTests {
95+
/// How many threads were actually asked for.
96+
count: usize,
97+
98+
/// Forward channels, acts as semaphores forward.
99+
forward: heapless::Vec<Channel<CriticalSectionRawMutex, (), 1>, NUM_THREADS>,
100+
101+
back: Channel<CriticalSectionRawMutex, (), 1>,
102+
103+
/// Each worker sends results back through this.
104+
answers: Channel<CriticalSectionRawMutex, Answer, 1>,
105+
}
106+
107+
impl ThreadTests {
108+
/// Construct the tests.
109+
///
110+
/// Note that this uses a single StaticCell, and therefore can only be called once.
111+
fn new(count: usize) -> &'static Self {
112+
static THIS: StaticCell<ThreadTests> = StaticCell::new();
113+
let this = THIS.init(Self {
114+
count,
115+
forward: heapless::Vec::new(),
116+
back: Channel::new(),
117+
answers: Channel::new(),
118+
});
119+
120+
for _ in 0..count {
121+
this.forward.push(Channel::new()).ok().unwrap();
122+
}
123+
124+
this
38125
}
126+
127+
async fn run(&'static self, spawner: Spawner, low_spawner: SendSpawner, command: Command) {
128+
let desc = format!("{:?}", command);
129+
let timer = BenchTimer::new(&desc, self.count * command.get_count());
130+
131+
let mut answers: heapless::Vec<Option<usize>, NUM_THREADS> = heapless::Vec::new();
132+
for _ in 0..self.count {
133+
answers.push(None).unwrap();
134+
}
135+
let mut low = false;
136+
let mut msg_count = (1 + self.count) as isize;
137+
138+
// Fire off all of the workers.
139+
for id in 0..self.count {
140+
spawner.spawn(worker(self, id, command)).unwrap();
141+
}
142+
143+
// And the "low" priority thread (which isn't lower at this time).
144+
low_spawner.spawn(low_task(self, command)).unwrap();
145+
//let _ = low_spawner;
146+
//spawner.spawn(low_task(self, command)).unwrap();
147+
148+
// Now wait for all of the responses.
149+
loop {
150+
match self.answers.receive().await {
151+
Answer::Worker { id, count } => {
152+
if answers[id].replace(count).is_some() {
153+
panic!("Multiple results from worker {}", id);
154+
}
155+
msg_count -= 1;
156+
if msg_count <= 0 {
157+
break;
158+
}
159+
}
160+
161+
Answer::Low => {
162+
if low {
163+
panic!("Multiple result from 'low' worker");
164+
}
165+
low = true;
166+
167+
msg_count -= 1;
168+
if msg_count <= 0 {
169+
break;
170+
}
171+
}
172+
}
173+
}
174+
175+
if msg_count != 0 {
176+
panic!("Invalid number of replies\n");
177+
}
178+
179+
timer.stop();
180+
}
181+
}
182+
183+
/// An individual work thread. This performs the specified operation, returning the result.
184+
#[embassy_executor::task(pool_size = NUM_THREADS)]
185+
async fn worker(this: &'static ThreadTests, id: usize, command: Command) {
186+
let mut total = 0;
187+
188+
match command {
189+
Command::Empty => {
190+
// Nothing to do.
191+
}
192+
Command::PingPong(count) => {
193+
// The ping pong test, reads messages from in indexed channel (one for each worker), and
194+
// replies to a shared channel.
195+
for _ in 0..count {
196+
this.forward[id].receive().await;
197+
this.back.send(()).await;
198+
total += 1;
199+
}
200+
}
201+
}
202+
203+
this.answers.send(Answer::Worker { id, count: total }).await;
39204
}
40205

206+
/// The low priority worker for the given command. Exits when finished.
41207
#[embassy_executor::task]
42-
async fn other_task(_spawner: Spawner) {
43-
info!("The other task");
44-
let mut count = 0;
45-
loop {
46-
CHAN.send(count).await;
47-
count = count.wrapping_add(1);
208+
async fn low_task(this: &'static ThreadTests, command: Command) {
209+
match command {
210+
Command::Empty => {
211+
// Nothing to do.
212+
}
213+
Command::PingPong(count) => {
214+
// Each worker expects a message to tell it to work, and will reply with its answer.
215+
for _ in 0..count {
216+
for forw in &this.forward {
217+
forw.send(()).await;
218+
this.back.receive().await;
219+
}
220+
}
221+
}
222+
}
223+
224+
this.answers.send(Answer::Low).await;
225+
}
48226

49-
Timer::after(Duration::from_secs(1)).await;
227+
#[derive(Copy, Clone, Debug)]
228+
enum Command {
229+
/// The empty test. Does nothing, but invokes everything. Useful to determine overhead.
230+
Empty,
231+
/// Pong test. Each thread waits for a message on its own channel, and then replies on a shared
232+
/// channel to a common worker that is performing these operations.
233+
PingPong(usize),
234+
}
235+
236+
impl Command {
237+
/// Return how many operations this particular command invokes.
238+
fn get_count(self) -> usize {
239+
match self {
240+
Self::Empty => 0,
241+
Self::PingPong(count) => count,
242+
}
50243
}
51244
}
245+
246+
#[derive(Debug)]
247+
enum Answer {
248+
/// A worker has finished it's processing.
249+
Worker {
250+
/// What is the id of this worker.
251+
id: usize,
252+
/// Operation count.
253+
count: usize,
254+
},
255+
/// The low priority task has completed.
256+
Low,
257+
}
258+
259+
// TODO: Put this benchmarking stuff somewhere useful.
260+
fn now() -> u64 {
261+
unsafe { k_cycle_get_64() }
262+
}
263+
264+
/// Timing some operations.
265+
///
266+
/// To use:
267+
/// ```
268+
/// /// 500 is the number of iterations happening.
269+
/// let timer = BenchTimer::new("My thing", 500);
270+
/// // operations
271+
/// timer.stop("Thing being timed");
272+
/// ```
273+
pub struct BenchTimer<'a> {
274+
what: &'a str,
275+
start: u64,
276+
count: usize,
277+
}
278+
279+
impl<'a> BenchTimer<'a> {
280+
pub fn new(what: &'a str, count: usize) -> Self {
281+
Self {
282+
what,
283+
start: now(),
284+
count,
285+
}
286+
}
287+
288+
pub fn stop(self) {
289+
let stop = now();
290+
let time =
291+
(stop - self.start) as f64 / (CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC as f64) * 1000.0;
292+
let time = if self.count > 0 {
293+
time / (self.count as f64) * 1000.0
294+
} else {
295+
0.0
296+
};
297+
298+
printkln!(" {:8.3} us, {} of {}", time, self.count, self.what);
299+
}
300+
}
301+
302+
kobj_define! {
303+
static LOW_THREAD: StaticThread;
304+
static LOW_STACK: ThreadStack<THREAD_STACK_SIZE>;
305+
}

0 commit comments

Comments
 (0)