Skip to content

Commit ccca1fb

Browse files
Added point in time support and the Search.iterate() method
1 parent 76a57fd commit ccca1fb

File tree

5 files changed

+169
-0
lines changed

5 files changed

+169
-0
lines changed

elasticsearch_dsl/_async/search.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import contextlib
19+
1820
from elasticsearch.exceptions import ApiError
1921
from elasticsearch.helpers import async_scan
2022

@@ -92,6 +94,8 @@ async def scan(self):
9294
pass to the underlying ``scan`` helper from ``elasticsearch-py`` -
9395
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
9496
97+
The ``iterate()`` method should be preferred, as it provides similar
98+
functionality using a point in time.
9599
"""
96100
es = get_connection(self._using)
97101

@@ -113,6 +117,57 @@ async def delete(self):
113117
)
114118
)
115119

120+
@contextlib.asynccontextmanager
121+
async def point_in_time(self, keep_alive="1m"):
122+
"""
123+
Open a point in time (pit) that can be used across several searches.
124+
125+
This method implements a context manager that returns a search object
126+
configured to operate within the created pit.
127+
128+
:arg keep_alive: the time to live for the point in time, renewed with each search request
129+
130+
The following example shows how to paginate through all the documents of an index::
131+
132+
page_size = 10
133+
with Search(index="my-index")[:page_size].point_in_time() as s:
134+
while True:
135+
r = s.execute() # get a page of results
136+
// ... do something with r.hits
137+
138+
if len(r.hits) < page_size:
139+
break # we reached the end
140+
s = r.search_after()
141+
"""
142+
es = get_connection(self._using)
143+
144+
pit = await es.open_point_in_time(
145+
index=self._index or "*", keep_alive=keep_alive
146+
)
147+
search = self.index().extra(pit={"id": pit["id"], "keep_alive": keep_alive})
148+
if not search._sort:
149+
search = search.sort("_shard_doc")
150+
yield search
151+
await es.close_point_in_time(id=pit["id"])
152+
153+
async def iterate(self, keep_alive="1m"):
154+
"""
155+
Return a generator that iterates over all the documents matching the query.
156+
157+
This method uses a point in time to provide consistent results even when
158+
the index is changing. It should be preferred over ``scan()``.
159+
160+
:arg keep_alive: the time to live for the point in time, renewed with each new search request
161+
"""
162+
async with self.point_in_time(keep_alive=keep_alive) as s:
163+
while True:
164+
r = await s.execute()
165+
for hit in r:
166+
yield hit
167+
if len(r.hits) == 0:
168+
break
169+
s = r.search_after()
170+
116171

117172
class AsyncMultiSearch(MultiSearchBase):
118173
"""

elasticsearch_dsl/_sync/search.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import contextlib
19+
1820
from elasticsearch.exceptions import ApiError
1921
from elasticsearch.helpers import scan
2022

@@ -88,6 +90,8 @@ def scan(self):
8890
pass to the underlying ``scan`` helper from ``elasticsearch-py`` -
8991
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
9092
93+
The ``iterate()`` method should be preferred, as it provides similar
94+
functionality using a point in time.
9195
"""
9296
es = get_connection(self._using)
9397

@@ -105,6 +109,55 @@ def delete(self):
105109
es.delete_by_query(index=self._index, body=self.to_dict(), **self._params)
106110
)
107111

112+
@contextlib.contextmanager
113+
def point_in_time(self, keep_alive="1m"):
114+
"""
115+
Open a point in time (pit) that can be used across several searches.
116+
117+
This method implements a context manager that returns a search object
118+
configured to operate within the created pit.
119+
120+
:arg keep_alive: the time to live for the point in time, renewed with each search request
121+
122+
The following example shows how to paginate through all the documents of an index::
123+
124+
page_size = 10
125+
with Search(index="my-index")[:page_size].point_in_time() as s:
126+
while True:
127+
r = s.execute() # get a page of results
128+
// ... do something with r.hits
129+
130+
if len(r.hits) < page_size:
131+
break # we reached the end
132+
s = r.search_after()
133+
"""
134+
es = get_connection(self._using)
135+
136+
pit = es.open_point_in_time(index=self._index or "*", keep_alive=keep_alive)
137+
search = self.index().extra(pit={"id": pit["id"], "keep_alive": keep_alive})
138+
if not search._sort:
139+
search = search.sort("_shard_doc")
140+
yield search
141+
es.close_point_in_time(id=pit["id"])
142+
143+
def iterate(self, keep_alive="1m"):
144+
"""
145+
Return a generator that iterates over all the documents matching the query.
146+
147+
This method uses a point in time to provide consistent results even when
148+
the index is changing. It should be preferred over ``scan()``.
149+
150+
:arg keep_alive: the time to live for the point in time, renewed with each new search request
151+
"""
152+
with self.point_in_time(keep_alive=keep_alive) as s:
153+
while True:
154+
r = s.execute()
155+
for hit in r:
156+
yield hit
157+
if len(r.hits) == 0:
158+
break
159+
s = r.search_after()
160+
108161

109162
class MultiSearch(MultiSearchBase):
110163
"""

tests/test_integration/_async/test_search.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,37 @@ async def test_search_after_no_results(async_data_client):
179179
await r.search_after()
180180

181181

182+
@pytest.mark.asyncio
183+
async def test_point_in_time(async_data_client):
184+
page_size = 7
185+
commits = []
186+
async with AsyncSearch(index="flat-git")[:page_size].point_in_time(
187+
keep_alive="30s"
188+
) as s:
189+
pit_id = s._extra["pit"]["id"]
190+
while True:
191+
r = await s.execute()
192+
commits += r.hits
193+
if len(r.hits) < page_size:
194+
break
195+
s = r.search_after()
196+
assert pit_id == s._extra["pit"]["id"]
197+
assert "30s" == s._extra["pit"]["keep_alive"]
198+
199+
assert 52 == len(commits)
200+
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}
201+
202+
203+
@pytest.mark.asyncio
204+
async def test_iterate(async_data_client):
205+
s = AsyncSearch(index="flat-git")
206+
207+
commits = [commit async for commit in s.iterate()]
208+
209+
assert 52 == len(commits)
210+
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}
211+
212+
182213
@pytest.mark.asyncio
183214
async def test_response_is_cached(async_data_client):
184215
s = Repository.search()

tests/test_integration/_sync/test_search.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,35 @@ def test_search_after_no_results(data_client):
171171
r.search_after()
172172

173173

174+
@pytest.mark.sync
175+
def test_point_in_time(data_client):
176+
page_size = 7
177+
commits = []
178+
with Search(index="flat-git")[:page_size].point_in_time(keep_alive="30s") as s:
179+
pit_id = s._extra["pit"]["id"]
180+
while True:
181+
r = s.execute()
182+
commits += r.hits
183+
if len(r.hits) < page_size:
184+
break
185+
s = r.search_after()
186+
assert pit_id == s._extra["pit"]["id"]
187+
assert "30s" == s._extra["pit"]["keep_alive"]
188+
189+
assert 52 == len(commits)
190+
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}
191+
192+
193+
@pytest.mark.sync
194+
def test_iterate(data_client):
195+
s = Search(index="flat-git")
196+
197+
commits = [commit for commit in s.iterate()]
198+
199+
assert 52 == len(commits)
200+
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}
201+
202+
174203
@pytest.mark.sync
175204
def test_response_is_cached(data_client):
176205
s = Repository.search()

utils/run-unasync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def main(check=False):
7272
"async_sleep": "sleep",
7373
"assert_awaited_once_with": "assert_called_once_with",
7474
"pytest_asyncio": "pytest",
75+
"asynccontextmanager": "contextmanager",
7576
}
7677
rules = [
7778
unasync.Rule(

0 commit comments

Comments
 (0)