Skip to content

Commit 662298a

Browse files
committed
Allow retries for statuses other than 429 in bulk streaming
Closes elastic#1004. This updates elastic#1005 to work for both the async and sync client as well as adding tests.
1 parent 17535e0 commit 662298a

File tree

4 files changed

+138
-20
lines changed

4 files changed

+138
-20
lines changed

elasticsearch/_async/helpers.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@ async def azip(
158158
pass
159159

160160

161+
def _retry_for_status(status: int) -> bool:
162+
if status == 429:
163+
return True
164+
return False
165+
166+
161167
async def async_streaming_bulk(
162168
client: AsyncElasticsearch,
163169
actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
@@ -167,6 +173,7 @@ async def async_streaming_bulk(
167173
expand_action_callback: Callable[
168174
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
169175
] = expand_action,
176+
retry_for_status_callback: Callable[[int], bool] = _retry_for_status,
170177
raise_on_exception: bool = True,
171178
max_retries: int = 0,
172179
initial_backoff: float = 2,
@@ -185,10 +192,11 @@ async def async_streaming_bulk(
185192
entire input is consumed and sent.
186193
187194
If you specify ``max_retries`` it will also retry any documents that were
188-
rejected with a ``429`` status code. To do this it will wait (**by calling
189-
asyncio.sleep**) for ``initial_backoff`` seconds and then,
190-
every subsequent rejection for the same chunk, for double the time every
191-
time up to ``max_backoff`` seconds.
195+
rejected with a ``429`` status code. Use ``retry_for_status_callback`` to
196+
configure which status codes will be retried. To do this it will wait
197+
(**by calling time.sleep which will block**) for ``initial_backoff`` seconds
198+
and then, every subsequent rejection for the same chunk, for double the time
199+
every time up to ``max_backoff`` seconds.
192200
193201
:arg client: instance of :class:`~elasticsearch.AsyncElasticsearch` to use
194202
:arg actions: iterable or async iterable containing the actions to be executed
@@ -201,8 +209,12 @@ async def async_streaming_bulk(
201209
:arg expand_action_callback: callback executed on each action passed in,
202210
should return a tuple containing the action line and the data line
203211
(`None` if data line should be omitted).
212+
:arg retry_for_status_callback: callback executed on each item's status,
213+
should return a True if the status require a retry and False if not.
214+
(if `None` is specified only status 429 will retry).
204215
:arg max_retries: maximum number of times a document will be retried when
205-
``429`` is received, set to 0 (default) for no retries on ``429``
216+
retry_for_status_callback (defaulting to ``429``) is received,
217+
set to 0 (default) for no retries on retry_for_status_callback
206218
:arg initial_backoff: number of seconds we should wait before the first
207219
retry. Any subsequent retries will be powers of ``initial_backoff *
208220
2**retry_number``
@@ -267,11 +279,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
267279

268280
if not ok:
269281
action, info = info.popitem()
270-
# retry if retries enabled, we get 429, and we are not
271-
# in the last attempt
282+
# retry if retries enabled, we are not in the last attempt,
283+
# and retry_for_status_callback is true (defaulting to 429)
272284
if (
273285
max_retries
274-
and info["status"] == 429
286+
and retry_for_status_callback(info["status"])
275287
and (attempt + 1) <= max_retries
276288
):
277289
# _process_bulk_chunk expects strings so we need to
@@ -284,8 +296,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
284296
yield ok, info
285297

286298
except ApiError as e:
287-
# suppress 429 errors since we will retry them
288-
if attempt == max_retries or e.status_code != 429:
299+
# suppress any status which retry_for_status_callback is true (defaulting to 429)
300+
# since we will retry them
301+
if attempt == max_retries or not retry_for_status_callback(
302+
e.status_code
303+
):
289304
raise
290305
else:
291306
if not to_retry:

elasticsearch/helpers/actions.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,12 @@ def _process_bulk_chunk(
355355
yield from gen
356356

357357

358+
def _retry_for_status(status: int) -> bool:
359+
if status == 429:
360+
return True
361+
return False
362+
363+
358364
def streaming_bulk(
359365
client: Elasticsearch,
360366
actions: Iterable[_TYPE_BULK_ACTION],
@@ -364,6 +370,7 @@ def streaming_bulk(
364370
expand_action_callback: Callable[
365371
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
366372
] = expand_action,
373+
retry_for_status_callback: Callable[[int], bool] = _retry_for_status,
367374
raise_on_exception: bool = True,
368375
max_retries: int = 0,
369376
initial_backoff: float = 2,
@@ -382,10 +389,11 @@ def streaming_bulk(
382389
entire input is consumed and sent.
383390
384391
If you specify ``max_retries`` it will also retry any documents that were
385-
rejected with a ``429`` status code. To do this it will wait (**by calling
386-
time.sleep which will block**) for ``initial_backoff`` seconds and then,
387-
every subsequent rejection for the same chunk, for double the time every
388-
time up to ``max_backoff`` seconds.
392+
rejected with a ``429`` status code. Use ``retry_for_status_callback`` to
393+
configure which status codes will be retried. To do this it will wait
394+
(**by calling time.sleep which will block**) for ``initial_backoff`` seconds
395+
and then, every subsequent rejection for the same chunk, for double the time
396+
every time up to ``max_backoff`` seconds.
389397
390398
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
391399
:arg actions: iterable containing the actions to be executed
@@ -398,8 +406,12 @@ def streaming_bulk(
398406
:arg expand_action_callback: callback executed on each action passed in,
399407
should return a tuple containing the action line and the data line
400408
(`None` if data line should be omitted).
409+
:arg retry_for_status_callback: callback executed on each item's status,
410+
should return a True if the status require a retry and False if not.
411+
(if `None` is specified only status 429 will retry).
401412
:arg max_retries: maximum number of times a document will be retried when
402-
``429`` is received, set to 0 (default) for no retries on ``429``
413+
retry_for_status_callback (defaulting to ``429``) is received,
414+
set to 0 (default) for no retries on retry_for_status_callback
403415
:arg initial_backoff: number of seconds we should wait before the first
404416
retry. Any subsequent retries will be powers of ``initial_backoff *
405417
2**retry_number``
@@ -451,11 +463,11 @@ def streaming_bulk(
451463

452464
if not ok:
453465
action, info = info.popitem()
454-
# retry if retries enabled, we get 429, and we are not
455-
# in the last attempt
466+
# retry if retries enabled, we are not in the last attempt,
467+
# and retry_for_status_callback is true (defaulting to 429)
456468
if (
457469
max_retries
458-
and info["status"] == 429
470+
and retry_for_status_callback(info["status"])
459471
and (attempt + 1) <= max_retries
460472
):
461473
# _process_bulk_chunk expects bytes so we need to
@@ -468,8 +480,11 @@ def streaming_bulk(
468480
yield ok, info
469481

470482
except ApiError as e:
471-
# suppress 429 errors since we will retry them
472-
if attempt == max_retries or e.status_code != 429:
483+
# suppress any status which retry_for_status_callback is true (defaulting to 429)
484+
# since we will retry them
485+
if attempt == max_retries or not retry_for_status_callback(
486+
e.status_code
487+
):
473488
raise
474489
else:
475490
if not to_retry:

test_elasticsearch/test_async/test_server/test_helpers.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,50 @@ async def streaming_bulk():
292292
await streaming_bulk()
293293
assert 4 == failing_client._called
294294

295+
async def test_connection_timeout_is_retried_with_retry_status_callback(
296+
self, async_client
297+
):
298+
failing_client = FailingBulkClient(
299+
async_client,
300+
fail_with=ApiError(
301+
message="Connection timed out!",
302+
body={},
303+
meta=ApiResponseMeta(
304+
status=522, headers={}, http_version="1.1", duration=0, node=None
305+
),
306+
),
307+
)
308+
docs = [
309+
{"_index": "i", "_id": 47, "f": "v"},
310+
{"_index": "i", "_id": 45, "f": "v"},
311+
{"_index": "i", "_id": 42, "f": "v"},
312+
]
313+
314+
def _retry_for_connection_timeout(status):
315+
if status == 522:
316+
return True
317+
return False
318+
319+
results = [
320+
x
321+
async for x in helpers.async_streaming_bulk(
322+
failing_client,
323+
docs,
324+
raise_on_exception=False,
325+
raise_on_error=False,
326+
chunk_size=1,
327+
retry_for_status_callback=_retry_for_connection_timeout,
328+
max_retries=1,
329+
initial_backoff=0,
330+
)
331+
]
332+
assert 3 == len(results)
333+
assert [True, True, True] == [r[0] for r in results]
334+
await async_client.indices.refresh(index="i")
335+
res = await async_client.search(index="i")
336+
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
337+
assert 4 == failing_client._called
338+
295339

296340
class TestBulk(object):
297341
async def test_bulk_works_with_single_item(self, async_client):

test_elasticsearch/test_server/test_helpers.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,50 @@ def streaming_bulk():
288288
assert 4 == failing_client._called
289289

290290

291+
def test_connection_timeout_is_retried_with_retry_status_callback(sync_client):
292+
failing_client = FailingBulkClient(
293+
sync_client,
294+
fail_with=ApiError(
295+
message="Connection timed out!",
296+
body={},
297+
meta=ApiResponseMeta(
298+
status=522, headers={}, http_version="1.1", duration=0, node=None
299+
),
300+
),
301+
)
302+
docs = [
303+
{"_index": "i", "_id": 47, "f": "v"},
304+
{"_index": "i", "_id": 45, "f": "v"},
305+
{"_index": "i", "_id": 42, "f": "v"},
306+
]
307+
308+
def _retry_for_connection_timeout(status):
309+
if status == 522:
310+
return True
311+
return False
312+
313+
results = list(
314+
helpers.streaming_bulk(
315+
failing_client,
316+
docs,
317+
index="i",
318+
raise_on_exception=False,
319+
raise_on_error=False,
320+
chunk_size=1,
321+
retry_for_status_callback=_retry_for_connection_timeout,
322+
max_retries=1,
323+
initial_backoff=0,
324+
)
325+
)
326+
assert 3 == len(results)
327+
print(results)
328+
assert [True, True, True] == [r[0] for r in results]
329+
sync_client.indices.refresh(index="i")
330+
res = sync_client.search(index="i")
331+
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
332+
assert 4 == failing_client._called
333+
334+
291335
def test_bulk_works_with_single_item(sync_client):
292336
docs = [{"answer": 42, "_id": 1}]
293337
success, failed = helpers.bulk(sync_client, docs, index="test-index", refresh=True)

0 commit comments

Comments
 (0)