Skip to content

expand the run_coroutine_threadsafe recipies #127576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1067,14 +1067,59 @@ Scheduling From Other Threads
This function is meant to be called from a different OS thread
than the one where the event loop is running. Example::

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3
def in_thread(loop: asyncio.AbstractEventLoop) -> None:
# Run some blocking IO
pathlib.Path("example.txt").write_text("hello world", encoding="utf8")

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout=2) == 3

async def amain() -> None:
# Get the running loop
loop = asyncio.get_running_loop()

# Run something in a thread
await asyncio.to_thread(in_thread, loop)

It's also possible to run the other way around. Example::

@contextlib.contextmanager
def loop_in_thread() -> Generator[asyncio.AbstractEventLoop]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think the above simpler example is sufficient, this one looks very contrived to me involving thread pool executor and all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to get any failures out, if they happen when constructing the loop

Copy link
Contributor

@kumaraditya303 kumaraditya303 Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that can we avoid mixing concurrent futures executors with loop? Starting loop in a thread should be sufficient here no? I don't want to encourage such code which creates loop in another thread but runs code from another thread especially using concurrent futures.

Copy link
Contributor Author

@graingert graingert Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to be able to start and stop the thread, and collect errors that get raised from asyncio.run, it's complicated to do that with threading.Thread

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of something like this:

from threading import Thread
import contextlib
import asyncio
import concurrent.futures


@contextlib.contextmanager
def get_loop():
    loop_fut = concurrent.futures.Future()
    stop = asyncio.Event()

    async def runner():
        loop_fut.set_result(asyncio.get_event_loop())
        await stop.wait()

    try:
        t = Thread(target=lambda: asyncio.run(runner()))
        t.start()
        loop = loop_fut.result()
        yield loop
    finally:
        loop.call_soon_threadsafe(stop.set)
        t.join()


with get_loop() as loop:
    r = asyncio.run_coroutine_threadsafe(
        asyncio.sleep(1, result="hello"), loop
    ).result()
    print(r)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But ThreadPoolExecutor(1) does all that for you, and collects any result from asyncio.run. Eg your code will hang if the OS has run out of file descriptors, mine will propagate the exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I take it, I am not really a fan of this pattern but I am fine if you wish.

loop_fut = concurrent.futures.Future[asyncio.AbstractEventLoop]()
stop_event = asyncio.Event()

async def main() -> None:
loop_fut.set_result(asyncio.get_running_loop())
await stop_event.wait()

with concurrent.futures.ThreadPoolExecutor(1) as tpe:
complete_fut = tpe.submit(asyncio.run, main())
for fut in concurrent.futures.as_completed((loop_fut, complete_fut)):
if fut is loop_fut:
loop = loop_fut.result()
try:
yield loop
finally:
loop.call_soon_threadsafe(stop_event.set)
else:
fut.result()

# Create a loop in another thread
with loop_in_thread() as loop:
# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout=2) == 3

If an exception is raised in the coroutine, the returned Future
will be notified. It can also be used to cancel the task in
Expand Down
Loading