-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
expand the run_coroutine_threadsafe recipies #127576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
expand the run_coroutine_threadsafe recipies #127576
Conversation
It's also possible to run the other way around. Example:: | ||
|
||
@contextlib.contextmanager | ||
def loop_in_thread() -> Generator[asyncio.AbstractEventLoop]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think the above simpler example is sufficient, this one looks very contrived to me involving thread pool executor and all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used to get any failures out, if they happen when constructing the loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant that can we avoid mixing concurrent futures executors with loop? Starting loop in a thread should be sufficient here no? I don't want to encourage such code which creates loop in another thread but runs code from another thread especially using concurrent futures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to be able to start and stop the thread, and collect errors that get raised from asyncio.run, it's complicated to do that with threading.Thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking of something like this:
from threading import Thread
import contextlib
import asyncio
import concurrent.futures
@contextlib.contextmanager
def get_loop():
loop_fut = concurrent.futures.Future()
stop = asyncio.Event()
async def runner():
loop_fut.set_result(asyncio.get_event_loop())
await stop.wait()
try:
t = Thread(target=lambda: asyncio.run(runner()))
t.start()
loop = loop_fut.result()
yield loop
finally:
loop.call_soon_threadsafe(stop.set)
t.join()
with get_loop() as loop:
r = asyncio.run_coroutine_threadsafe(
asyncio.sleep(1, result="hello"), loop
).result()
print(r)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But ThreadPoolExecutor(1) does all that for you, and collects any result from asyncio.run. Eg your code will hang if the OS has run out of file descriptors, mine will propagate the exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I take it, I am not really a fan of this pattern but I am fine if you wish.
Co-authored-by: Kumar Aditya <[email protected]>
Co-authored-by: Kumar Aditya <[email protected]>
I see a lot of people having trouble using run_coroutine_threadsafe correctly:
I've expanded the examples to cover these cases.
📚 Documentation preview 📚: https://cpython-previews--127576.org.readthedocs.build/