Skip to content

Commit 61c8d11

Browse files
authored
Add a test using its own executor (until the Kani library includes one) (rust-lang#1658)
1 parent a2ecd4b commit 61c8d11

File tree

1 file changed

+202
-0
lines changed

1 file changed

+202
-0
lines changed

tests/kani/AsyncAwait/manual_spawn.rs

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Copyright Kani Contributors
2+
// SPDX-License-Identifier: Apache-2.0 OR MIT
3+
//
4+
// compile-flags: --edition 2018
5+
6+
//! This file tests a hand-written spawn infrastructure and executor.
7+
//! This should be replaced with code from the Kani library as soon as the executor can get merged.
8+
//! Tracking issue: https://github.com/model-checking/kani/issues/1685
9+
10+
use std::{
11+
future::Future,
12+
pin::Pin,
13+
sync::{
14+
atomic::{AtomicI64, Ordering},
15+
Arc,
16+
},
17+
task::{Context, RawWaker, RawWakerVTable, Waker},
18+
};
19+
20+
/// A dummy waker, which is needed to call [`Future::poll`]
21+
const NOOP_RAW_WAKER: RawWaker = {
22+
#[inline]
23+
unsafe fn clone_waker(_: *const ()) -> RawWaker {
24+
NOOP_RAW_WAKER
25+
}
26+
27+
#[inline]
28+
unsafe fn noop(_: *const ()) {}
29+
30+
RawWaker::new(std::ptr::null(), &RawWakerVTable::new(clone_waker, noop, noop, noop))
31+
};
32+
33+
static mut GLOBAL_EXECUTOR: Scheduler = Scheduler::new();
34+
const MAX_TASKS: usize = 16;
35+
36+
type BoxFuture = Pin<Box<dyn Future<Output = ()> + Sync + 'static>>;
37+
38+
/// Indicates to the scheduler whether it can `assume` that the returned task is running.
39+
/// This is useful if the task was picked nondeterministically using `any()`.
40+
pub enum SchedulingOptimization {
41+
CanAssumeRunning,
42+
CannotAssumeRunning,
43+
}
44+
45+
/// Allows to parameterize how the scheduler picks the next task to poll in `spawnable_block_on`
46+
pub trait SchedulingStrategy {
47+
/// Picks the next task to be scheduled whenever the scheduler needs to pick a task to run next, and whether it can be assumed that the picked task is still running
48+
///
49+
/// Tasks are numbered `0..num_tasks`.
50+
/// For example, if pick_task(4) returns `(2, CanAssumeRunning)` than it picked the task with index 2 and allows Kani to `assume` that this task is still running.
51+
/// This is useful if the task is chosen nondeterministicall (`kani::any()`) and allows the verifier to discard useless execution branches (such as polling a completed task again).
52+
fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingOptimization);
53+
}
54+
55+
/// Keeps cycling through the tasks in a deterministic order
56+
#[derive(Default)]
57+
pub struct RoundRobin {
58+
index: usize,
59+
}
60+
61+
impl SchedulingStrategy for RoundRobin {
62+
fn pick_task(&mut self, num_tasks: usize) -> (usize, SchedulingOptimization) {
63+
self.index = (self.index + 1) % num_tasks;
64+
(self.index, SchedulingOptimization::CannotAssumeRunning)
65+
}
66+
}
67+
68+
pub struct Scheduler {
69+
/// Using a Vec instead of an array makes the runtime increase by a factor of 200.
70+
tasks: [Option<BoxFuture>; MAX_TASKS],
71+
num_tasks: usize,
72+
num_running: usize,
73+
}
74+
75+
impl Scheduler {
76+
/// Creates a scheduler with an empty task list
77+
pub const fn new() -> Scheduler {
78+
const INIT: Option<BoxFuture> = None;
79+
Scheduler { tasks: [INIT; MAX_TASKS], num_tasks: 0, num_running: 0 }
80+
}
81+
82+
/// Adds a future to the scheduler's task list, returning a JoinHandle
83+
pub fn spawn<F: Future<Output = ()> + Sync + 'static>(&mut self, fut: F) -> JoinHandle {
84+
let index = self.num_tasks;
85+
self.tasks[index] = Some(Box::pin(fut));
86+
self.num_tasks += 1;
87+
assert!(self.num_tasks < MAX_TASKS, "more than {} tasks", MAX_TASKS);
88+
self.num_running += 1;
89+
JoinHandle { index }
90+
}
91+
92+
/// Runs the scheduler with the given scheduling plan until all tasks have completed
93+
pub fn run(&mut self, mut scheduling_plan: impl SchedulingStrategy) {
94+
let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
95+
let cx = &mut Context::from_waker(&waker);
96+
while self.num_running > 0 {
97+
let (index, can_assume_running) = scheduling_plan.pick_task(self.num_tasks);
98+
let task = &mut self.tasks[index];
99+
if let Some(fut) = task.as_mut() {
100+
match fut.as_mut().poll(cx) {
101+
std::task::Poll::Ready(()) => {
102+
self.num_running -= 1;
103+
let _prev = std::mem::replace(task, None);
104+
}
105+
std::task::Poll::Pending => (),
106+
}
107+
} else if let SchedulingOptimization::CanAssumeRunning = can_assume_running {
108+
#[cfg(kani)]
109+
kani::assume(false); // useful so that we can assume that a nondeterministically picked task is still running
110+
}
111+
}
112+
}
113+
114+
/// Polls the given future and the tasks it may spawn until all of them complete.
115+
pub fn block_on<F: Future<Output = ()> + Sync + 'static>(
116+
&mut self,
117+
fut: F,
118+
scheduling_plan: impl SchedulingStrategy,
119+
) {
120+
self.spawn(fut);
121+
self.run(scheduling_plan);
122+
}
123+
}
124+
125+
/// Result of spawning a task.
126+
///
127+
/// If you `.await` a JoinHandle, this will wait for the spawned task to complete.
128+
pub struct JoinHandle {
129+
index: usize,
130+
}
131+
132+
impl Future for JoinHandle {
133+
type Output = ();
134+
135+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
136+
if unsafe { GLOBAL_EXECUTOR.tasks[self.index].is_some() } {
137+
std::task::Poll::Pending
138+
} else {
139+
cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
140+
std::task::Poll::Ready(())
141+
}
142+
}
143+
}
144+
145+
#[inline] // to work around linking issue
146+
pub fn spawn<F: Future<Output = ()> + Sync + 'static>(fut: F) -> JoinHandle {
147+
unsafe { GLOBAL_EXECUTOR.spawn(fut) }
148+
}
149+
150+
/// Polls the given future and the tasks it may spawn until all of them complete
151+
///
152+
/// Contrary to block_on, this allows `spawn`ing other futures
153+
pub fn spawnable_block_on<F: Future<Output = ()> + Sync + 'static>(
154+
fut: F,
155+
scheduling_plan: impl SchedulingStrategy,
156+
) {
157+
unsafe {
158+
GLOBAL_EXECUTOR.block_on(fut, scheduling_plan);
159+
}
160+
}
161+
162+
struct YieldNow {
163+
yielded: bool,
164+
}
165+
166+
impl Future for YieldNow {
167+
type Output = ();
168+
169+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
170+
if self.yielded {
171+
cx.waker().wake_by_ref(); // For completeness. But Kani currently ignores wakers.
172+
std::task::Poll::Ready(())
173+
} else {
174+
self.yielded = true;
175+
std::task::Poll::Pending
176+
}
177+
}
178+
}
179+
180+
/// Suspends execution of the current future, to allow the scheduler to poll another future
181+
pub fn yield_now() -> impl Future<Output = ()> {
182+
YieldNow { yielded: false }
183+
}
184+
185+
#[kani::proof]
186+
#[kani::unwind(4)]
187+
fn arc_spawn_deterministic_test() {
188+
let x = Arc::new(AtomicI64::new(0)); // Surprisingly, Arc verified faster than Rc
189+
let x2 = x.clone();
190+
spawnable_block_on(
191+
async move {
192+
let x3 = x2.clone();
193+
spawn(async move {
194+
x3.fetch_add(1, Ordering::Relaxed);
195+
});
196+
yield_now().await;
197+
x2.fetch_add(1, Ordering::Relaxed);
198+
},
199+
RoundRobin::default(),
200+
);
201+
assert_eq!(x.load(Ordering::Relaxed), 2);
202+
}

0 commit comments

Comments
 (0)