Skip to content

Commit 650ad7d

Browse files
Added point in time support and the Search.iterate() method
1 parent 76a57fd commit 650ad7d

File tree

5 files changed

+167
-0
lines changed

5 files changed

+167
-0
lines changed

elasticsearch_dsl/_async/search.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import contextlib
19+
1820
from elasticsearch.exceptions import ApiError
1921
from elasticsearch.helpers import async_scan
2022

@@ -92,6 +94,8 @@ async def scan(self):
9294
pass to the underlying ``scan`` helper from ``elasticsearch-py`` -
9395
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
9496
97+
The ``iterate()`` method should be preferred, as it provides similar
98+
functionality using a point in time.
9599
"""
96100
es = get_connection(self._using)
97101

@@ -113,6 +117,56 @@ async def delete(self):
113117
)
114118
)
115119

120+
@contextlib.asynccontextmanager
121+
async def point_in_time(self, keep_alive="1m"):
122+
"""
123+
Open a point in time (pit) that can be used across several searches.
124+
125+
:arg keep_alive: the time to live for the point in time, renewed with each new search request
126+
127+
This method implements a context manager that returns a search object
128+
configured to operate within the created pit. The following example
129+
shows how to paginate through all the documents of an index::
130+
131+
page_size = 10
132+
with Search(index="my-index")[:page_size].point_in_time() as s:
133+
while True:
134+
r = s.execute() # get a page of results
135+
// ... do something with r.hits
136+
137+
if len(r.hits) < page_size:
138+
break # we reached the end
139+
s = r.search_after()
140+
"""
141+
es = get_connection(self._using)
142+
143+
pit = await es.open_point_in_time(
144+
index=self._index or "*", keep_alive=keep_alive
145+
)
146+
search = self.index().extra(pit={"id": pit["id"], "keep_alive": keep_alive})
147+
if not search._sort:
148+
search = search.sort("_shard_doc")
149+
yield search
150+
await es.close_point_in_time(id=pit["id"])
151+
152+
async def iterate(self, keep_alive="1m"):
153+
"""
154+
Return a generator that iterates over all the documents matching the query.
155+
156+
:arg keep_alive: the time to live for the point in time, renewed with each new search request
157+
158+
This method uses a point in time to provide consistent results even when
159+
the underlying index is changing. It should be preferred over ``scan()``.
160+
"""
161+
async with self.point_in_time(keep_alive=keep_alive) as s:
162+
while True:
163+
r = await s.execute()
164+
for hit in r:
165+
yield hit
166+
if len(r.hits) == 0:
167+
break
168+
s = r.search_after()
169+
116170

117171
class AsyncMultiSearch(MultiSearchBase):
118172
"""

elasticsearch_dsl/_sync/search.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import contextlib
19+
1820
from elasticsearch.exceptions import ApiError
1921
from elasticsearch.helpers import scan
2022

@@ -88,6 +90,8 @@ def scan(self):
8890
pass to the underlying ``scan`` helper from ``elasticsearch-py`` -
8991
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
9092
93+
The ``iterate()`` method should be preferred, as it provides similar
94+
functionality using a point in time.
9195
"""
9296
es = get_connection(self._using)
9397

@@ -105,6 +109,54 @@ def delete(self):
105109
es.delete_by_query(index=self._index, body=self.to_dict(), **self._params)
106110
)
107111

112+
@contextlib.contextmanager
113+
def point_in_time(self, keep_alive="1m"):
114+
"""
115+
Open a point in time (pit) that can be used across several searches.
116+
117+
:arg keep_alive: the time to live for the point in time, renewed with each new search request
118+
119+
This method implements a context manager that returns a search object
120+
configured to operate within the created pit. The following example
121+
shows how to paginate through all the documents of an index::
122+
123+
page_size = 10
124+
with Search(index="my-index")[:page_size].point_in_time() as s:
125+
while True:
126+
r = s.execute() # get a page of results
127+
// ... do something with r.hits
128+
129+
if len(r.hits) < page_size:
130+
break # we reached the end
131+
s = r.search_after()
132+
"""
133+
es = get_connection(self._using)
134+
135+
pit = es.open_point_in_time(index=self._index or "*", keep_alive=keep_alive)
136+
search = self.index().extra(pit={"id": pit["id"], "keep_alive": keep_alive})
137+
if not search._sort:
138+
search = search.sort("_shard_doc")
139+
yield search
140+
es.close_point_in_time(id=pit["id"])
141+
142+
def iterate(self, keep_alive="1m"):
143+
"""
144+
Return a generator that iterates over all the documents matching the query.
145+
146+
:arg keep_alive: the time to live for the point in time, renewed with each new search request
147+
148+
This method uses a point in time to provide consistent results even when
149+
the underlying index is changing. It should be preferred over ``scan()``.
150+
"""
151+
with self.point_in_time(keep_alive=keep_alive) as s:
152+
while True:
153+
r = s.execute()
154+
for hit in r:
155+
yield hit
156+
if len(r.hits) == 0:
157+
break
158+
s = r.search_after()
159+
108160

109161
class MultiSearch(MultiSearchBase):
110162
"""

tests/test_integration/_async/test_search.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,37 @@ async def test_search_after_no_results(async_data_client):
179179
await r.search_after()
180180

181181

182+
@pytest.mark.asyncio
183+
async def test_point_in_time(async_data_client):
184+
page_size = 7
185+
commits = []
186+
async with AsyncSearch(index="flat-git")[:page_size].point_in_time(
187+
keep_alive="30s"
188+
) as s:
189+
pit_id = s._extra["pit"]["id"]
190+
while True:
191+
r = await s.execute()
192+
commits += r.hits
193+
if len(r.hits) < page_size:
194+
break
195+
s = r.search_after()
196+
assert pit_id == s._extra["pit"]["id"]
197+
assert "30s" == s._extra["pit"]["keep_alive"]
198+
199+
assert 52 == len(commits)
200+
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}
201+
202+
203+
@pytest.mark.asyncio
204+
async def test_iterate(async_data_client):
205+
s = AsyncSearch(index="flat-git")
206+
207+
commits = [commit async for commit in s.iterate()]
208+
209+
assert 52 == len(commits)
210+
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}
211+
212+
182213
@pytest.mark.asyncio
183214
async def test_response_is_cached(async_data_client):
184215
s = Repository.search()

tests/test_integration/_sync/test_search.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,35 @@ def test_search_after_no_results(data_client):
171171
r.search_after()
172172

173173

174+
@pytest.mark.sync
175+
def test_point_in_time(data_client):
176+
page_size = 7
177+
commits = []
178+
with Search(index="flat-git")[:page_size].point_in_time(keep_alive="30s") as s:
179+
pit_id = s._extra["pit"]["id"]
180+
while True:
181+
r = s.execute()
182+
commits += r.hits
183+
if len(r.hits) < page_size:
184+
break
185+
s = r.search_after()
186+
assert pit_id == s._extra["pit"]["id"]
187+
assert "30s" == s._extra["pit"]["keep_alive"]
188+
189+
assert 52 == len(commits)
190+
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}
191+
192+
193+
@pytest.mark.sync
194+
def test_iterate(data_client):
195+
s = Search(index="flat-git")
196+
197+
commits = [commit for commit in s.iterate()]
198+
199+
assert 52 == len(commits)
200+
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}
201+
202+
174203
@pytest.mark.sync
175204
def test_response_is_cached(data_client):
176205
s = Repository.search()

utils/run-unasync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def main(check=False):
7272
"async_sleep": "sleep",
7373
"assert_awaited_once_with": "assert_called_once_with",
7474
"pytest_asyncio": "pytest",
75+
"asynccontextmanager": "contextmanager",
7576
}
7677
rules = [
7778
unasync.Rule(

0 commit comments

Comments
 (0)