|
22 | 22 | from bson.raw_bson import RawBSONDocument
|
23 | 23 | from bson.timestamp import Timestamp
|
24 | 24 | from pymongo import _csot, common
|
25 |
| -from pymongo.aggregation import ( |
26 |
| - _AggregationCommand, |
27 |
| - _CollectionAggregationCommand, |
28 |
| - _DatabaseAggregationCommand, |
29 |
| -) |
30 | 25 | from pymongo.collation import validate_collation_or_none
|
31 |
| -from pymongo.command_cursor import CommandCursor |
32 | 26 | from pymongo.errors import (
|
33 | 27 | ConnectionFailure,
|
34 | 28 | CursorNotFound,
|
|
37 | 31 | PyMongoError,
|
38 | 32 | )
|
39 | 33 | from pymongo.operations import _Op
|
| 34 | +from pymongo.synchronous.aggregation import ( |
| 35 | + _AggregationCommand, |
| 36 | + _CollectionAggregationCommand, |
| 37 | + _DatabaseAggregationCommand, |
| 38 | +) |
| 39 | +from pymongo.synchronous.command_cursor import CommandCursor |
40 | 40 | from pymongo.typings import _CollationIn, _DocumentType, _Pipeline
|
41 | 41 |
|
| 42 | +_IS_SYNC = True |
| 43 | + |
42 | 44 | # The change streams spec considers the following server errors from the
|
43 | 45 | # getMore command non-resumable. All other getMore errors are resumable.
|
44 | 46 | _RESUMABLE_GETMORE_ERRORS = frozenset(
|
|
65 | 67 |
|
66 | 68 |
|
67 | 69 | if TYPE_CHECKING:
|
68 |
| - from pymongo.client_session import ClientSession |
69 |
| - from pymongo.collection import Collection |
70 |
| - from pymongo.database import Database |
71 |
| - from pymongo.mongo_client import MongoClient |
72 |
| - from pymongo.pool import Connection |
| 70 | + from pymongo.synchronous.client_session import ClientSession |
| 71 | + from pymongo.synchronous.collection import Collection |
| 72 | + from pymongo.synchronous.database import Database |
| 73 | + from pymongo.synchronous.mongo_client import MongoClient |
| 74 | + from pymongo.synchronous.pool import Connection |
73 | 75 |
|
74 | 76 |
|
75 | 77 | def _resumable(exc: PyMongoError) -> bool:
|
@@ -100,7 +102,9 @@ class ChangeStream(Generic[_DocumentType]):
|
100 | 102 | def __init__(
|
101 | 103 | self,
|
102 | 104 | target: Union[
|
103 |
| - MongoClient[_DocumentType], Database[_DocumentType], Collection[_DocumentType] |
| 105 | + MongoClient[_DocumentType], |
| 106 | + Database[_DocumentType], |
| 107 | + Collection[_DocumentType], |
104 | 108 | ],
|
105 | 109 | pipeline: Optional[_Pipeline],
|
106 | 110 | full_document: Optional[str],
|
@@ -149,6 +153,8 @@ def __init__(
|
149 | 153 | self._closed = False
|
150 | 154 | self._timeout = self._target._timeout
|
151 | 155 | self._show_expanded_events = show_expanded_events
|
| 156 | + |
| 157 | + def _initialize_cursor(self) -> None: |
152 | 158 | # Initialize cursor.
|
153 | 159 | self._cursor = self._create_cursor()
|
154 | 160 |
|
@@ -180,7 +186,7 @@ def _change_stream_options(self) -> dict[str, Any]:
|
180 | 186 | else:
|
181 | 187 | options["resumeAfter"] = resume_token
|
182 | 188 |
|
183 |
| - if self._start_at_operation_time is not None: |
| 189 | + elif self._start_at_operation_time is not None: |
184 | 190 | options["startAtOperationTime"] = self._start_at_operation_time
|
185 | 191 |
|
186 | 192 | if self._show_expanded_events:
|
|
0 commit comments