Skip to content

Commit 4589faa

Browse files
authored
[Proxying] Update proxying with callback to support cancellation (#18776)
Completely overhaul the interface for proxying with callbacks: - Rename `emscripten_proxy_async_with_callback` to the simpler `emscripten_proxy_callback`. - Add an optional `cancel` callback argument that will be scheduled on the proxying thread if the worker thread dies before the work is started. A future PR will change this so that the `cancel` callback will be called if a worker thread dies before the work is finished to be more consistent with the `proxy_sync` APIs. - Remove the ability to receive the result of the proxied function as an argument to the callback function since the extra expressiveness was not general enough to warrant the added complexity. - Use only a single context argument rather than a separate context for the proxied function and the callback. This is just as expressive, but it is simpler avoids the question of what context object should be passed to the cancellation callback. Also update the only user of the callback API, dynlink.c, to use the new API. A future PR will implement a promise-based proxying API and update dynlink.c to use that instead, so this PR just tries to make the minimal necessary changes to dynlink.c.
1 parent 168dafb commit 4589faa

13 files changed

+330
-162
lines changed

ChangeLog.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ See docs/process.md for more on how version tagging works.
4848
- Synchronous proxying functions in emscripten/proxying.h now return errors
4949
instead of hanging forever when the worker thread dies before the proxied work
5050
is finished.
51+
- The `emscripten_proxy_async_with_callback` API was replaced with a simpler
52+
`emscripten_proxy_callback` API that takes a second callback to be called if
53+
the worker thread dies before completing the proxied work.
5154

5255
3.1.31 - 01/26/23
5356
-----------------

site/source/docs/api_reference/proxying.h.rst

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,6 @@ Functions
7878
thread then return immediately without waiting for ``func`` to be executed.
7979
Returns 1 if the work was successfully enqueued or 0 otherwise.
8080
81-
.. c:function:: int emscripten_proxy_async_with_callback(em_proxying_queue* q, pthread_t target_thread, void* (*func)(void*), void* arg, void (*callback)(void* arg, void* result), void* callback_arg)
82-
83-
Enqueue `func` on the given queue and thread. Once (and if) it finishes
84-
executing, it will asynchronously proxy `callback` back to the current thread
85-
on the same queue. The result of the proxied function will be passed as the
86-
second argument to the callback. Returns 1 if the initial work was
87-
successfully enqueued and the target thread notified or 0 otherwise. If the
88-
callback cannot be scheduled (for example due to OOM), the program is aborted.
89-
9081
.. c:function:: int emscripten_proxy_sync(em_proxying_queue* q, pthread_t target_thread, void (*func)(void*), void* arg)
9182
9283
Enqueue ``func`` to be called with argument ``arg`` on the given queue and
@@ -103,6 +94,15 @@ Functions
10394
``emscripten_proxying_finish`` itself; it could instead store the context
10495
pointer and call ``emscripten_proxying_finish`` at an arbitrary later time.
10596

97+
.. c:function:: int emscripten_proxy_callback(em_proxying_queue* q, pthread_t target_thread, void (*func)(void*), void (*callback)(void*), void (*cancel)(void*), void* arg)
98+
99+
Enqueue ``func`` on the given queue and thread. Once (and if) it finishes
100+
executing, it will asynchronously proxy ``callback`` back to the current
101+
thread on the same queue, or if the target thread dies before the work can be
102+
completed, ``cancel`` will be proxied back instead. All three function will
103+
receive the same argument, ``arg``. Returns 1 if ``func`` was successfully
104+
enqueued and the target thread notified or 0 otherwise.
105+
106106
C++ API
107107
-------
108108
@@ -134,11 +134,11 @@ defined within namespace ``emscripten``.
134134
Calls ``emscripten_proxy_async`` to execute ``func``, returning ``true`` if the
135135
function was successfully enqueued and ``false`` otherwise.
136136
137-
.. cpp:member:: bool proxyAsyncWithCallback(pthread_t target, std::function<void()>&& func, std::function<void()>&& callback)
137+
.. cpp:member:: bool proxyCallback(pthread_t target, std::function<void()>&& func, std::function<void()>&& callback, std::function<void()>&& cancel)
138138
139-
Calls ``emscripten_proxy_async_with_callback`` to execute ``func`` and
140-
schedule ``callback``, returning ``true`` if the function was successfully
141-
enqueued and ``false`` otherwise.
139+
Calls ``emscripten_proxy_callback`` to execute ``func`` and schedule either
140+
``callback`` or ``cancel``, returning ``true`` if the function was
141+
successfully enqueued and ``false`` otherwise.
142142
143143
.. cpp:member:: bool proxySync(const pthread_t target, const std::function<void()>& func)
144144

src/library_pthread.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,6 +1085,13 @@ var LibraryPThread = {
10851085
#if PTHREADS_DEBUG
10861086
dbg('invokeEntryPoint: ' + ptrToString(ptr));
10871087
#endif
1088+
#if EXIT_RUNTIME && !MINIMAL_RUNTIME
1089+
// An old thread on this worker may have been canceled without returning the
1090+
// `runtimeKeepaliveCounter` to zero. Reset it now so the new thread won't
1091+
// be affected.
1092+
runtimeKeepaliveCounter = 0;
1093+
#endif
1094+
10881095
#if MAIN_MODULE
10891096
// Before we call the thread entry point, make sure any shared libraries
10901097
// have been loaded on this there. Otherwise our table migth be not be

system/include/emscripten/proxying.h

Lines changed: 69 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,6 @@ int emscripten_proxy_async(em_proxying_queue* q,
5454
void (*func)(void*),
5555
void* arg);
5656

57-
// Enqueue `func` on the given queue and thread. Once (and if) it finishes
58-
// executing, it will asynchronously proxy `callback` back to the current thread
59-
// on the same queue. The result of the proxied function will be passed as the
60-
// second argument to the callback. Returns 1 if the initial work was
61-
// successfully enqueued and the target thread notified or 0 otherwise. If the
62-
// callback cannot be scheduled (for example due to OOM), the program is
63-
// aborted.
64-
int emscripten_proxy_async_with_callback(em_proxying_queue* q,
65-
pthread_t target_thread,
66-
void* (*func)(void*),
67-
void* arg,
68-
void (*callback)(void* arg,
69-
void* result),
70-
void* callback_arg);
71-
7257
// Enqueue `func` on the given queue and thread and wait for it to finish
7358
// executing before returning. Returns 1 if the task was successfully completed
7459
// and 0 otherwise, including if the target thread is canceled or exits before
@@ -90,6 +75,19 @@ int emscripten_proxy_sync_with_ctx(em_proxying_queue* q,
9075
void (*func)(em_proxying_ctx*, void*),
9176
void* arg);
9277

78+
// Enqueue `func` on the given queue and thread. Once (and if) it finishes
79+
// executing, it will asynchronously proxy `callback` back to the current thread
80+
// on the same queue, or if the target thread dies before the work can be
81+
// completed, `cancel` will be proxied back instead. All three function will
82+
// receive the same argument, `arg`. Returns 1 if `func` was successfully
83+
// enqueued and the target thread notified or 0 otherwise.
84+
int emscripten_proxy_callback(em_proxying_queue* q,
85+
pthread_t target_thread,
86+
void (*func)(void*),
87+
void (*callback)(void*),
88+
void (*cancel)(void*),
89+
void* arg);
90+
9391
#ifdef __cplusplus
9492
} // extern "C"
9593

@@ -111,19 +109,6 @@ class ProxyingQueue {
111109
delete f;
112110
}
113111

114-
static void* runAndFreeWithResult(void* arg) {
115-
auto* f = (std::function<void*()>*)arg;
116-
void* result = (*f)();
117-
delete f;
118-
return result;
119-
}
120-
121-
static void runAndFreeCallback(void* arg, void* result) {
122-
auto* f = (std::function<void(void*)>*)arg;
123-
(*f)(result);
124-
delete f;
125-
}
126-
127112
static void run(void* arg) {
128113
auto* f = (std::function<void()>*)arg;
129114
(*f)();
@@ -134,6 +119,37 @@ class ProxyingQueue {
134119
(*f)(ProxyingCtx{ctx});
135120
}
136121

122+
struct CallbackFuncs {
123+
std::function<void()> func;
124+
std::function<void()> callback;
125+
std::function<void()> cancel;
126+
127+
CallbackFuncs(std::function<void()>&& func,
128+
std::function<void()>&& callback,
129+
std::function<void()>&& cancel)
130+
: func(std::move(func)), callback(std::move(callback)),
131+
cancel(std::move(cancel)) {}
132+
};
133+
134+
static void runFunc(void* arg) {
135+
auto* info = (CallbackFuncs*)arg;
136+
info->func();
137+
}
138+
139+
static void runCallback(void* arg) {
140+
auto* info = (CallbackFuncs*)arg;
141+
info->callback();
142+
delete info;
143+
}
144+
145+
static void runCancel(void* arg) {
146+
auto* info = (CallbackFuncs*)arg;
147+
if (info->cancel) {
148+
info->cancel();
149+
}
150+
delete info;
151+
}
152+
137153
public:
138154
em_proxying_queue* queue = em_proxying_queue_create();
139155

@@ -142,11 +158,13 @@ class ProxyingQueue {
142158
ProxyingQueue() = default;
143159
ProxyingQueue& operator=(const ProxyingQueue&) = delete;
144160
ProxyingQueue& operator=(ProxyingQueue&& other) {
145-
if (queue) {
146-
em_proxying_queue_destroy(queue);
161+
if (this != &other) {
162+
if (queue) {
163+
em_proxying_queue_destroy(queue);
164+
}
165+
queue = other.queue;
166+
other.queue = nullptr;
147167
}
148-
queue = other.queue;
149-
other.queue = nullptr;
150168
return *this;
151169
}
152170

@@ -177,21 +195,11 @@ class ProxyingQueue {
177195
// Refer to the corresponding C API documentation.
178196
bool proxyAsync(pthread_t target, std::function<void()>&& func) {
179197
std::function<void()>* arg = new std::function<void()>(std::move(func));
180-
return emscripten_proxy_async(queue, target, runAndFree, (void*)arg);
181-
}
182-
183-
bool proxyAsyncWithCallback(pthread_t target,
184-
std::function<void*()>&& func,
185-
std::function<void(void*)>&& callback) {
186-
std::function<void*()>* arg = new std::function<void*()>(std::move(func));
187-
std::function<void(void*)>* callback_arg =
188-
new std::function<void(void*)>(std::move(callback));
189-
return emscripten_proxy_async_with_callback(queue,
190-
target,
191-
runAndFreeWithResult,
192-
(void*)arg,
193-
runAndFreeCallback,
194-
(void*)callback_arg);
198+
if (!emscripten_proxy_async(queue, target, runAndFree, (void*)arg)) {
199+
delete arg;
200+
return false;
201+
}
202+
return true;
195203
}
196204

197205
bool proxySync(const pthread_t target, const std::function<void()>& func) {
@@ -203,6 +211,20 @@ class ProxyingQueue {
203211
return emscripten_proxy_sync_with_ctx(
204212
queue, target, runWithCtx, (void*)&func);
205213
}
214+
215+
bool proxyCallback(pthread_t target,
216+
std::function<void()>&& func,
217+
std::function<void()>&& callback,
218+
std::function<void()>&& cancel) {
219+
CallbackFuncs* info = new CallbackFuncs(
220+
std::move(func), std::move(callback), std::move(cancel));
221+
if (!emscripten_proxy_callback(
222+
queue, target, runFunc, runCallback, runCancel, info)) {
223+
delete info;
224+
return false;
225+
}
226+
return true;
227+
}
206228
};
207229

208230
} // namespace emscripten

system/lib/libc/dynlink.c

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -303,9 +303,15 @@ bool _emscripten_dlsync_self() {
303303
return true;
304304
}
305305

306-
static void* do_thread_sync(void* arg) {
306+
struct promise_result {
307+
em_promise_t promise;
308+
bool result;
309+
};
310+
311+
static void do_thread_sync(void* arg) {
307312
dbg("do_thread_sync");
308-
return (void*)_emscripten_dlsync_self();
313+
struct promise_result* info = arg;
314+
info->result = _emscripten_dlsync_self();
309315
}
310316

311317
static void do_thread_sync_out(void* arg) {
@@ -315,10 +321,11 @@ static void do_thread_sync_out(void* arg) {
315321
}
316322

317323
// Called once _emscripten_proxy_dlsync completes
318-
static void done_thread_sync(void* arg, void* result) {
319-
em_promise_t promise = (em_promise_t)arg;
320-
dbg("done_thread_sync: promise=%p result=%p", promise, result);
321-
if (result) {
324+
static void done_thread_sync(void* arg) {
325+
struct promise_result* info = arg;
326+
em_promise_t promise = info->promise;
327+
dbg("done_thread_sync: promise=%p result=%i", promise, info->result);
328+
if (info->result) {
322329
emscripten_promise_resolve(promise, EM_PROMISE_FULFILL, NULL);
323330
} else {
324331
#if ABORT_ON_SYNC_FAILURE
@@ -328,6 +335,7 @@ static void done_thread_sync(void* arg, void* result) {
328335
#endif
329336
}
330337
emscripten_promise_destroy(promise);
338+
free(info);
331339
}
332340

333341
// Proxying queue specically for handling code loading (dlopen) events.
@@ -356,13 +364,28 @@ int _emscripten_proxy_dlsync_async(pthread_t target_thread, em_promise_t promise
356364
if (!dlopen_proxying_queue) {
357365
dlopen_proxying_queue = em_proxying_queue_create();
358366
}
359-
int rtn = emscripten_proxy_async_with_callback(dlopen_proxying_queue,
360-
target_thread,
361-
do_thread_sync,
362-
NULL,
363-
done_thread_sync,
364-
promise);
365-
if (target_thread->sleeping) {
367+
368+
struct promise_result* info = malloc(sizeof(struct promise_result));
369+
if (!info) {
370+
return false;
371+
}
372+
*info = (struct promise_result){
373+
.promise = promise,
374+
.result = false,
375+
};
376+
int rtn = emscripten_proxy_callback(dlopen_proxying_queue,
377+
target_thread,
378+
do_thread_sync,
379+
done_thread_sync,
380+
done_thread_sync,
381+
info);
382+
if (!rtn) {
383+
// If we failed to proxy, then the target thread is no longer alive and no
384+
// longer needs to be caught up, so we can resolve the promise early.
385+
emscripten_promise_resolve(promise, EM_PROMISE_FULFILL, NULL);
386+
emscripten_promise_destroy(promise);
387+
free(info);
388+
} else if (target_thread->sleeping) {
366389
// If the target thread is in the sleeping state (and this check is
367390
// performed after the enqueuing of the async work) then we know its safe to
368391
// resolve the promise early, since the thread will process our event as

0 commit comments

Comments
 (0)