Skip to content

PYTHON-4669 - Update More APIs for Motor Compatibility #1815

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions gridfs/asynchronous/grid_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,20 +1176,22 @@ def __getattr__(self, name: str) -> Any:
raise AttributeError("GridIn object has no attribute '%s'" % name)

def __setattr__(self, name: str, value: Any) -> None:
if _IS_SYNC:
# For properties of this instance like _buffer, or descriptors set on
# the class like filename, use regular __setattr__
if name in self.__dict__ or name in self.__class__.__dict__:
object.__setattr__(self, name, value)
else:
# For properties of this instance like _buffer, or descriptors set on
# the class like filename, use regular __setattr__
if name in self.__dict__ or name in self.__class__.__dict__:
object.__setattr__(self, name, value)
else:
if _IS_SYNC:
# All other attributes are part of the document in db.fs.files.
# Store them to be sent to server on close() or if closed, send
# them now.
self._file[name] = value
if self._closed:
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
else:
object.__setattr__(self, name, value)
else:
raise AttributeError(
"AsyncGridIn does not support __setattr__. Use AsyncGridIn.set() instead"
)

async def set(self, name: str, value: Any) -> None:
# For properties of this instance like _buffer, or descriptors set on
Expand Down Expand Up @@ -1484,6 +1486,17 @@ def __init__(
_file: Any
_chunk_iter: Any

async def __anext__(self) -> bytes:
return super().__next__()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. We can't call super().__next__() because that does blocking I/O.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good catch. We don't have an async equivalent here unless we write one ourselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOBase implements next using readline:

IOBase (and its subclasses) supports the iterator protocol, meaning that an IOBase object can be iterated over yielding the lines in a stream. Lines are defined slightly differently depending on whether the stream is a binary stream (yielding bytes), or a text stream (yielding character strings). See readline() below.

https://docs.python.org/3/library/io.html#io.IOBase

Copy link
Contributor Author

@NoahStapp NoahStapp Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't an asyncio version of readline, so we'd need to write our own. The canonical way to do so appears to be with threads (https://stackoverflow.com/questions/34699948/does-asyncio-supports-asynchronous-i-o-for-file-operations), at which point I question if the performance gained by not blocking the loop is more than the cost of thread overhead. The official CPython forums have similar concerns at the OS level: https://discuss.python.org/t/asyncio-for-files/31077/15.


def __next__(self) -> bytes: # noqa: F811, RUF100
if _IS_SYNC:
return super().__next__()
else:
raise TypeError(
"AsyncGridOut does not support synchronous iteration. Use `async for` instead"
)

async def open(self) -> None:
if not self._file:
_disallow_transactions(self._session)
Expand Down Expand Up @@ -1511,6 +1524,7 @@ async def readchunk(self) -> bytes:
"""Reads a chunk at a time. If the current position is within a
chunk the remainder of the chunk is returned.
"""
await self.open()
received = len(self._buffer) - self._buffer_pos
chunk_data = EMPTY
chunk_size = int(self.chunk_size)
Expand Down
28 changes: 20 additions & 8 deletions gridfs/synchronous/grid_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,20 +1166,22 @@ def __getattr__(self, name: str) -> Any:
raise AttributeError("GridIn object has no attribute '%s'" % name)

def __setattr__(self, name: str, value: Any) -> None:
if _IS_SYNC:
# For properties of this instance like _buffer, or descriptors set on
# the class like filename, use regular __setattr__
if name in self.__dict__ or name in self.__class__.__dict__:
object.__setattr__(self, name, value)
else:
# For properties of this instance like _buffer, or descriptors set on
# the class like filename, use regular __setattr__
if name in self.__dict__ or name in self.__class__.__dict__:
object.__setattr__(self, name, value)
else:
if _IS_SYNC:
# All other attributes are part of the document in db.fs.files.
# Store them to be sent to server on close() or if closed, send
# them now.
self._file[name] = value
if self._closed:
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
else:
object.__setattr__(self, name, value)
else:
raise AttributeError(
"GridIn does not support __setattr__. Use GridIn.set() instead"
)

def set(self, name: str, value: Any) -> None:
# For properties of this instance like _buffer, or descriptors set on
Expand Down Expand Up @@ -1472,6 +1474,15 @@ def __init__(
_file: Any
_chunk_iter: Any

def __next__(self) -> bytes:
return super().__next__()

def __next__(self) -> bytes: # noqa: F811, RUF100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to avoid the duplicate def __next__(self) definitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a limitation of the synchro script: it will translate the async __anext__ into __next__, but we want to have a separate __next__ for the async class that raises an error. That explicit __next__ will also get ported to the synchronous class unfortunately, giving us the duplicate defs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but can we workaround that? The duplicate code is strange the read. There's also a runtime perf cost to overriding a method just to call the super class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a simple way to workaround it, no. We could change the definition to be less confusing, like this:

    async def __anext__(self) -> bytes:
        return super().__next__()

    if not _IS_SYNC:

        def __next__(self) -> bytes:  # noqa: F811, RUF100
            raise TypeError(
                "AsyncGridOut does not support synchronous iteration. Use `async for` instead"
            )

Which would synchronize to

    def __next__(self) -> bytes:
        return super().__next__()

    if not _IS_SYNC:

        def __next__(self) -> bytes:  # noqa: F811, RUF100
            raise TypeError("GridOut does not support synchronous iteration. Use `for` instead")

We can also add a comment explaining why the duplicate def exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, but how about:

    if not _IS_SYNC:
        async def __anext__(self) -> bytes:
            return await self.readline()

        def __next__(self) -> bytes:  # noqa: F811, RUF100
            raise TypeError(
                "AsyncGridOut does not support synchronous iteration. Use `async for` instead"
            )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot we had our own async readline, good catch. This looks like we'd read a single line or every byte if the file wasn't line-delimited. Is that the intended behavior for iteration here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's what IOBase is supposed to do and AsyncGridOut iteration should match the sync version. We also need to remove IOBase from the async.

Copy link
Contributor Author

@NoahStapp NoahStapp Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, follow-up PR: #1821.

if _IS_SYNC:
return super().__next__()
else:
raise TypeError("GridOut does not support synchronous iteration. Use `for` instead")

def open(self) -> None:
if not self._file:
_disallow_transactions(self._session)
Expand Down Expand Up @@ -1499,6 +1510,7 @@ def readchunk(self) -> bytes:
"""Reads a chunk at a time. If the current position is within a
chunk the remainder of the chunk is returned.
"""
self.open()
received = len(self._buffer) - self._buffer_pos
chunk_data = EMPTY
chunk_size = int(self.chunk_size)
Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,18 @@ module = ["service_identity.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["pymongo.synchronous.*", "gridfs.synchronous.*"]
module = ["pymongo.synchronous.*"]
warn_unused_ignores = false
disable_error_code = ["unused-coroutine"]

[[tool.mypy.overrides]]
module = ["pymongo.asynchronous.*"]
warn_unused_ignores = false

[[tool.mypy.overrides]]
module = ["gridfs.synchronous.*"]
warn_unused_ignores = false
disable_error_code = ["unused-coroutine", "no-redef"]

[tool.ruff]
target-version = "py37"
Expand Down
Loading