Skip to content

Commit 4f4d36b

Browse files
implement task locals
1 parent 2f922ed commit 4f4d36b

File tree

7 files changed

+172
-199
lines changed

7 files changed

+172
-199
lines changed

src/task/block_on.rs

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use std::future::Future;
22

3-
use kv_log_macro::trace;
4-
5-
use crate::task::Task;
3+
use crate::task::Builder;
64

75
/// Spawns a task and blocks the current thread on its result.
86
///
@@ -31,33 +29,5 @@ pub fn block_on<F, T>(future: F) -> T
3129
where
3230
F: Future<Output = T>,
3331
{
34-
// Create a new task handle.
35-
let task = Task::new(None);
36-
37-
// Log this `block_on` operation.
38-
trace!("block_on", {
39-
task_id: task.id().0,
40-
parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0),
41-
});
42-
43-
let wrapped_future = async move {
44-
// Drop task-locals on exit.
45-
defer! {
46-
Task::get_current(|t| unsafe { t.drop_locals() });
47-
}
48-
49-
// Log completion on exit.
50-
defer! {
51-
trace!("completed", {
52-
task_id: Task::get_current(|t| t.id().0),
53-
});
54-
}
55-
56-
future.await
57-
};
58-
59-
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
60-
61-
// Run the future as a task.
62-
unsafe { Task::set_current(&task, || smol::block_on(wrapped_future)) }
32+
Builder::new().blocking(future)
6333
}

src/task/builder.rs

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::sync::Arc;
4+
use std::task::{Context, Poll};
25

36
use kv_log_macro::trace;
47

58
use crate::io;
6-
use crate::task::{JoinHandle, Task};
9+
use crate::task::{JoinHandle, Task, TaskLocalsWrapper};
710

811
/// Task builder that configures the settings of a new task.
912
#[derive(Debug, Default)]
@@ -25,41 +28,83 @@ impl Builder {
2528
self
2629
}
2730

31+
fn build<F, T>(self, future: F) -> SupportTaskLocals<F>
32+
where
33+
F: Future<Output = T>,
34+
{
35+
let name = self.name.map(Arc::new);
36+
37+
// Create a new task handle.
38+
let task = Task::new(name);
39+
40+
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
41+
42+
let tag = TaskLocalsWrapper::new(task.clone());
43+
44+
// FIXME: do not require all futures to be boxed.
45+
SupportTaskLocals {
46+
tag,
47+
future: Box::pin(future),
48+
}
49+
}
50+
2851
/// Spawns a task with the configured settings.
2952
pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
3053
where
3154
F: Future<Output = T> + Send + 'static,
3255
T: Send + 'static,
3356
{
34-
// Create a new task handle.
35-
let task = Task::new(self.name);
57+
let wrapped = self.build(future);
3658

3759
// Log this `spawn` operation.
3860
trace!("spawn", {
39-
task_id: task.id().0,
40-
parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0),
61+
task_id: wrapped.tag.id().0,
62+
parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
4163
});
4264

43-
let wrapped_future = async move {
44-
// Drop task-locals on exit.
45-
defer! {
46-
Task::get_current(|t| unsafe { t.drop_locals() });
47-
}
48-
49-
// Log completion on exit.
50-
defer! {
51-
trace!("completed", {
52-
task_id: Task::get_current(|t| t.id().0),
53-
});
54-
}
55-
future.await
56-
};
65+
let task = wrapped.tag.task().clone();
66+
let smol_task = smol::Task::spawn(wrapped).detach();
5767

58-
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
68+
Ok(JoinHandle::new(smol_task, task))
69+
}
5970

60-
// FIXME: figure out how to set the current task.
71+
/// Spawns a task with the configured settings, blocking on its execution.
72+
pub fn blocking<F, T>(self, future: F) -> T
73+
where
74+
F: Future<Output = T>,
75+
{
76+
let wrapped = self.build(future);
6177

62-
let smol_task = smol::Task::spawn(wrapped_future).detach();
63-
Ok(JoinHandle::new(smol_task, task))
78+
// Log this `block_on` operation.
79+
trace!("block_on", {
80+
task_id: wrapped.tag.id().0,
81+
parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
82+
});
83+
84+
// Run the future as a task.
85+
unsafe { TaskLocalsWrapper::set_current(&wrapped.tag, || smol::block_on(wrapped)) }
86+
}
87+
}
88+
89+
/// Wrapper to add support for task locals.
90+
struct SupportTaskLocals<F> {
91+
tag: TaskLocalsWrapper,
92+
future: Pin<Box<F>>,
93+
}
94+
95+
impl<F> Drop for SupportTaskLocals<F> {
96+
fn drop(&mut self) {
97+
// Log completion on exit.
98+
trace!("completed", {
99+
task_id: self.tag.id().0,
100+
});
101+
}
102+
}
103+
104+
impl<F: Future> Future for SupportTaskLocals<F> {
105+
type Output = F::Output;
106+
107+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108+
unsafe { TaskLocalsWrapper::set_current(&self.tag, || Pin::new(&mut self.future).poll(cx)) }
64109
}
65110
}

src/task/current.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::task::Task;
1+
use crate::task::{Task, TaskLocalsWrapper};
22

33
/// Returns a handle to the current task.
44
///
@@ -23,6 +23,6 @@ use crate::task::Task;
2323
/// # })
2424
/// ```
2525
pub fn current() -> Task {
26-
Task::get_current(|t| t.clone())
26+
TaskLocalsWrapper::get_current(|t| t.task().clone())
2727
.expect("`task::current()` called outside the context of a task")
2828
}

src/task/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ cfg_default! {
142142
pub use task_local::{AccessError, LocalKey};
143143

144144
pub(crate) use task_local::LocalsMap;
145+
pub(crate) use task_locals_wrapper::TaskLocalsWrapper;
145146

146147
mod block_on;
147148
mod builder;
@@ -153,6 +154,7 @@ cfg_default! {
153154
mod task;
154155
mod task_id;
155156
mod task_local;
157+
mod task_locals_wrapper;
156158

157159
#[cfg(any(feature = "unstable", test))]
158160
pub use spawn_blocking::spawn_blocking;

src/task/task.rs

Lines changed: 12 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,32 @@
1-
use std::cell::Cell;
21
use std::fmt;
3-
use std::mem::ManuallyDrop;
4-
use std::ptr;
5-
use std::sync::atomic::{AtomicPtr, Ordering};
62
use std::sync::Arc;
73

8-
use crate::task::{LocalsMap, TaskId};
9-
use crate::utils::abort_on_panic;
4+
use crate::task::TaskId;
105

11-
thread_local! {
12-
/// A pointer to the currently running task.
13-
static CURRENT: Cell<*const Task> = Cell::new(ptr::null_mut());
14-
}
15-
16-
/// The inner representation of a task handle.
17-
struct Inner {
6+
/// A handle to a task.
7+
#[derive(Clone)]
8+
pub struct Task {
189
/// The task ID.
1910
id: TaskId,
2011

2112
/// The optional task name.
22-
name: Option<Box<str>>,
23-
24-
/// The map holding task-local values.
25-
locals: LocalsMap,
26-
}
27-
28-
impl Inner {
29-
#[inline]
30-
fn new(name: Option<String>) -> Inner {
31-
Inner {
32-
id: TaskId::generate(),
33-
name: name.map(String::into_boxed_str),
34-
locals: LocalsMap::new(),
35-
}
36-
}
13+
name: Option<Arc<String>>,
3714
}
3815

39-
/// A handle to a task.
40-
pub struct Task {
41-
/// The inner representation.
42-
///
43-
/// This pointer is lazily initialized on first use. In most cases, the inner representation is
44-
/// never touched and therefore we don't allocate it unless it's really needed.
45-
inner: AtomicPtr<Inner>,
46-
}
47-
48-
unsafe impl Send for Task {}
49-
unsafe impl Sync for Task {}
50-
5116
impl Task {
5217
/// Creates a new task handle.
53-
///
54-
/// If the task is unnamed, the inner representation of the task will be lazily allocated on
55-
/// demand.
5618
#[inline]
57-
pub(crate) fn new(name: Option<String>) -> Task {
58-
let inner = match name {
59-
None => AtomicPtr::default(),
60-
Some(name) => {
61-
let raw = Arc::into_raw(Arc::new(Inner::new(Some(name))));
62-
AtomicPtr::new(raw as *mut Inner)
63-
}
64-
};
65-
Task { inner }
19+
pub(crate) fn new(name: Option<Arc<String>>) -> Task {
20+
Task {
21+
id: TaskId::generate(),
22+
name,
23+
}
6624
}
6725

6826
/// Gets the task's unique identifier.
6927
#[inline]
7028
pub fn id(&self) -> TaskId {
71-
self.inner().id
29+
self.id
7230
}
7331

7432
/// Returns the name of this task.
@@ -77,93 +35,7 @@ impl Task {
7735
///
7836
/// [`Builder::name`]: struct.Builder.html#method.name
7937
pub fn name(&self) -> Option<&str> {
80-
self.inner().name.as_ref().map(|s| &**s)
81-
}
82-
83-
/// Returns the map holding task-local values.
84-
pub(crate) fn locals(&self) -> &LocalsMap {
85-
&self.inner().locals
86-
}
87-
88-
/// Drops all task-local values.
89-
///
90-
/// This method is only safe to call at the end of the task.
91-
#[inline]
92-
pub(crate) unsafe fn drop_locals(&self) {
93-
let raw = self.inner.load(Ordering::Acquire);
94-
if let Some(inner) = raw.as_mut() {
95-
// Abort the process if dropping task-locals panics.
96-
abort_on_panic(|| {
97-
inner.locals.clear();
98-
});
99-
}
100-
}
101-
102-
/// Returns the inner representation, initializing it on first use.
103-
fn inner(&self) -> &Inner {
104-
loop {
105-
let raw = self.inner.load(Ordering::Acquire);
106-
if !raw.is_null() {
107-
return unsafe { &*raw };
108-
}
109-
110-
let new = Arc::into_raw(Arc::new(Inner::new(None))) as *mut Inner;
111-
if self.inner.compare_and_swap(raw, new, Ordering::AcqRel) != raw {
112-
unsafe {
113-
drop(Arc::from_raw(new));
114-
}
115-
}
116-
}
117-
}
118-
119-
/// Set a reference to the current task.
120-
pub(crate) unsafe fn set_current<F, R>(task: *const Task, f: F) -> R
121-
where
122-
F: FnOnce() -> R,
123-
{
124-
CURRENT.with(|current| {
125-
let old_task = current.replace(task);
126-
defer! {
127-
current.set(old_task);
128-
}
129-
f()
130-
})
131-
}
132-
133-
/// Gets a reference to the current task.
134-
pub(crate) fn get_current<F, R>(f: F) -> Option<R>
135-
where
136-
F: FnOnce(&Task) -> R,
137-
{
138-
let res = CURRENT.try_with(|current| unsafe { current.get().as_ref().map(f) });
139-
match res {
140-
Ok(Some(val)) => Some(val),
141-
Ok(None) | Err(_) => None,
142-
}
143-
}
144-
}
145-
146-
impl Drop for Task {
147-
fn drop(&mut self) {
148-
// Deallocate the inner representation if it was initialized.
149-
let raw = *self.inner.get_mut();
150-
if !raw.is_null() {
151-
unsafe {
152-
drop(Arc::from_raw(raw));
153-
}
154-
}
155-
}
156-
}
157-
158-
impl Clone for Task {
159-
fn clone(&self) -> Task {
160-
// We need to make sure the inner representation is initialized now so that this instance
161-
// and the clone have raw pointers that point to the same `Arc<Inner>`.
162-
let arc = unsafe { ManuallyDrop::new(Arc::from_raw(self.inner())) };
163-
let raw = Arc::into_raw(Arc::clone(&arc));
164-
Task {
165-
inner: AtomicPtr::new(raw as *mut Inner),
166-
}
38+
self.name.as_ref().map(|s| s.as_str())
16739
}
16840
}
16941

src/task/task_local.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::error::Error;
33
use std::fmt;
44
use std::sync::atomic::{AtomicU32, Ordering};
55

6-
use crate::task::Task;
6+
use crate::task::TaskLocalsWrapper;
77

88
/// The key for accessing a task-local value.
99
///
@@ -98,7 +98,7 @@ impl<T: Send + 'static> LocalKey<T> {
9898
where
9999
F: FnOnce(&T) -> R,
100100
{
101-
Task::get_current(|task| unsafe {
101+
TaskLocalsWrapper::get_current(|task| unsafe {
102102
// Prepare the numeric key, initialization function, and the map of task-locals.
103103
let key = self.key();
104104
let init = || Box::new((self.__init)()) as Box<dyn Send>;

0 commit comments

Comments
 (0)