-
Notifications
You must be signed in to change notification settings - Fork 199
Async: The future belongs to a different loop than the one specified as the loop argument #868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I think there is some sort of internal transaction limit that is being reached causing the above to silently fail. I found using a async def main2(urls):
sem = asyncio.Semaphore(100)
async def safe_process(rss_feed, url):
async with sem:
return await db_writer.process_rss_feed(rss_feed, url)
start = time()
tasks = []
for url in tqdm_original(urls):
try:
rss_feed = feedparser.parse(url)
task = asyncio.create_task(safe_process(rss_feed, url))
tasks.append(task)
except TimeoutError:
logging.error(f"❗️Timeout {url}")
await tqdm.asyncio.tqdm.gather(*tasks)
print(f"Finished in {(time() - start) / 60:.1f} minutes") It might be useful to see what is failing in neo4j so that a more useful warning can be shown |
I have been looking into this for quite a while now and am sadly not able to reproduce the error. I'm also very surprised by it because the driver is certainly not starting or stopping any event loops. And if you only call Until then, here is some general advice on your code.
|
I think I have found a way to reproduce the error just now. I created the async driver outside of |
Thanks for looking into this and I'm glad you were able to reproduce it. The scenario in your last message is exactly how I have my code set up, so your diagnosis is spot on. At the moment my code is structured using a class (essentially using the same style as the Limiting concurrent tasks to 100 with a |
This is a fragile solution and we might decide to make this usage of the driver fail in the future. The reason why this works is that the driver internally uses async synchronization constructs that are based on how Python 3.7 where they call So I highly recommend you to refactor your code either way. |
Ok I see, this makes a lot of sense. I'll look into refactoring my code as soon as I can. Thanks! |
For documentation purposes, should someone stumble across this later: In fact, I misread the new Condition code in Python 3.10+. It explicitly fixes the issue that used to arises from creating async synchronization primitives when no event loop was running. The condition now binds to a loop the first time it's actually used in an async context. This is even better as it will just allow users to create an async driver in a sync function and use it asynchronously later. Important note: this will likely only work when the user is on Python 3.10+ because the driver also relies on synchronization primitives that come with |
Bug Report
I am using
AsyncGraphDatabase
to build a database asynchronously. My code pulls data from many RSS feeds and writes certain components to the database. Each feed can be treated individually, hence the async approach. The feeds are first successfully downloaded (non-async) and put into a list of tasks, so I know it's not some HTTP error.If I process a small batch of tasks (100), my code usually runs fine - everything runs asynchronously and I'm seeing lots of CPU utilization and the expected speedup. However, when I run on the full list (1500+), I run into this error:
My code is laid out like this:
Where
create_podcast
,_find_number_of_episodes
&populate_episode
areasync
andawait
ed in the same way described hereMy code is launched like this:
Apologies for the lack of a reproducible example, but it seems to be a difficult issue to pin down due to its randomness. If you can steer me in the right direction maybe I can help try and isolate it
My Environment
Python Version: 3.10.8
Driver Version: 5.2.0
Server Version and Edition: Neo4j 4.3.6 enterprise
Operating System: Ubuntu 22.04
The text was updated successfully, but these errors were encountered: