Skip to content

Commit 0e55195

Browse files
authored
Merge branch 'main' into feat/DI-1832/chunkedbatch-php
2 parents 4561b7d + 28e583b commit 0e55195

File tree

6 files changed

+296
-53
lines changed

6 files changed

+296
-53
lines changed

.github/.cache_version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.17
1+
1.0.18

clients/algoliasearch-client-python/algoliasearch/search/client.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ async def chunked_batch(
417417
objects: List[Dict[str, Any]],
418418
action: Action = "addObject",
419419
wait_for_tasks: bool = False,
420+
batch_size: int = 1000,
420421
request_options: Optional[Union[dict, RequestOptions]] = None,
421422
) -> List[BatchResponse]:
422423
"""
@@ -426,7 +427,7 @@ async def chunked_batch(
426427
responses: List[BatchResponse] = []
427428
for i, obj in enumerate(objects):
428429
requests.append(BatchRequest(action=action, body=obj))
429-
if i % 1000 == 0:
430+
if i % batch_size == 0:
430431
responses.append(
431432
await self.batch(
432433
index_name=index_name,
@@ -446,14 +447,15 @@ async def replace_all_objects(
446447
self,
447448
index_name: str,
448449
objects: List[Dict[str, Any]],
450+
batch_size: int = 1000,
449451
request_options: Optional[Union[dict, RequestOptions]] = None,
450452
) -> List[ApiResponse[str]]:
451453
"""
452454
Helper: Replaces all objects (records) in the given `index_name` with the given `objects`. A temporary index is created during this process in order to backup your data.
453455
"""
454456
tmp_index_name = self.create_temporary_name(index_name)
455-
responses: List[ApiResponse[str]] = []
456-
copy_resp = await self.operation_index(
457+
458+
copy_operation_response = await self.operation_index(
457459
index_name=index_name,
458460
operation_index_params=OperationIndexParams(
459461
operation="copy",
@@ -466,34 +468,35 @@ async def replace_all_objects(
466468
),
467469
request_options=request_options,
468470
)
471+
await self.wait_for_task(
472+
index_name=index_name, task_id=copy_operation_response.task_id
473+
)
469474

470-
responses.append(copy_resp)
471-
472-
await self.wait_for_task(index_name=index_name, task_id=copy_resp.task_id)
473-
474-
save_resps = await self.chunked_batch(
475+
batch_responses = await self.chunked_batch(
475476
index_name=tmp_index_name,
476477
objects=objects,
477478
wait_for_tasks=True,
479+
batch_size=batch_size,
478480
request_options=request_options,
479481
)
480482

481-
responses += save_resps
482-
483-
move_resp = await self.operation_index(
483+
move_operation_response = await self.operation_index(
484484
index_name=tmp_index_name,
485485
operation_index_params=OperationIndexParams(
486486
operation="move",
487487
destination=index_name,
488488
),
489489
request_options=request_options,
490490
)
491+
await self.wait_for_task(
492+
index_name=tmp_index_name, task_id=move_operation_response.task_id
493+
)
491494

492-
responses.append(move_resp)
493-
494-
await self.wait_for_task(index_name=tmp_index_name, task_id=move_resp.task_id)
495-
496-
return responses
495+
return {
496+
"copy_operation_response": copy_operation_response,
497+
"batch_responses": batch_responses,
498+
"move_operation_response": move_operation_response,
499+
}
497500

498501
async def add_api_key_with_http_info(
499502
self,

templates/python/search_helpers.mustache

Lines changed: 47 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@
213213
objects: List[Dict[str, Any]],
214214
action: Action = "addObject",
215215
wait_for_tasks: bool = False,
216+
batch_size: int = 1000,
216217
request_options: Optional[Union[dict, RequestOptions]] = None,
217218
) -> List[BatchResponse]:
218219
"""
@@ -222,7 +223,7 @@
222223
responses: List[BatchResponse] = []
223224
for i, obj in enumerate(objects):
224225
requests.append(BatchRequest(action=action, body=obj))
225-
if i % 1000 == 0:
226+
if i % batch_size == 0:
226227
responses.append(
227228
await self.batch(
228229
index_name=index_name,
@@ -238,46 +239,57 @@
238239
)
239240
return responses
240241

241-
async def replace_all_objects(self, index_name: str, objects: List[Dict[str, Any]], request_options: Optional[Union[dict, RequestOptions]] = None) -> List[ApiResponse[str]]:
242+
async def replace_all_objects(
243+
self,
244+
index_name: str,
245+
objects: List[Dict[str, Any]],
246+
batch_size: int = 1000,
247+
request_options: Optional[Union[dict, RequestOptions]] = None,
248+
) -> List[ApiResponse[str]]:
242249
"""
243250
Helper: Replaces all objects (records) in the given `index_name` with the given `objects`. A temporary index is created during this process in order to backup your data.
244251
"""
245252
tmp_index_name = self.create_temporary_name(index_name)
246-
responses: List[ApiResponse[str]] = []
247-
copy_resp = await self.operation_index(
248-
index_name=index_name,
249-
operation_index_params=OperationIndexParams(
250-
operation="copy",
251-
destination=tmp_index_name,
252-
scope=[ScopeType("settings"), ScopeType("synonyms"), ScopeType("rules")]
253-
),
254-
request_options=request_options,
255-
)
256-
257-
responses.append(copy_resp)
258-
259-
await self.wait_for_task(index_name=index_name, task_id=copy_resp.task_id)
260253

261-
save_resps = await self.chunked_batch(
262-
index_name=tmp_index_name,
263-
objects=objects,
264-
wait_for_tasks=True,
265-
request_options=request_options,
254+
copy_operation_response = await self.operation_index(
255+
index_name=index_name,
256+
operation_index_params=OperationIndexParams(
257+
operation="copy",
258+
destination=tmp_index_name,
259+
scope=[
260+
ScopeType("settings"),
261+
ScopeType("synonyms"),
262+
ScopeType("rules"),
263+
],
264+
),
265+
request_options=request_options,
266+
)
267+
await self.wait_for_task(
268+
index_name=index_name, task_id=copy_operation_response.task_id
266269
)
267270

268-
responses += save_resps
269-
270-
move_resp = await self.operation_index(
271-
index_name=tmp_index_name,
272-
operation_index_params=OperationIndexParams(
273-
operation="move",
274-
destination=index_name,
275-
),
276-
request_options=request_options,
277-
)
278-
279-
responses.append(move_resp)
271+
batch_responses = await self.chunked_batch(
272+
index_name=tmp_index_name,
273+
objects=objects,
274+
wait_for_tasks=True,
275+
batch_size=batch_size,
276+
request_options=request_options,
277+
)
280278

281-
await self.wait_for_task(index_name=tmp_index_name, task_id=move_resp.task_id)
279+
move_operation_response = await self.operation_index(
280+
index_name=tmp_index_name,
281+
operation_index_params=OperationIndexParams(
282+
operation="move",
283+
destination=index_name,
284+
),
285+
request_options=request_options,
286+
)
287+
await self.wait_for_task(
288+
index_name=tmp_index_name, task_id=move_operation_response.task_id
289+
)
282290

283-
return responses
291+
return {
292+
"copy_operation_response": copy_operation_response,
293+
"batch_responses": batch_responses,
294+
"move_operation_response": move_operation_response,
295+
}

templates/python/tests/requests/helpers.mustache

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,114 @@ def test_generate_secured_api_key_0(self):
5858
self._client.get_secured_api_key_remaining_validity("foo")
5959
assert False
6060
except Exception as e:
61-
assert str(e) == "Incorrect padding"
61+
assert str(e) == "Incorrect padding"
62+
63+
async def test_replace_all_objects_0(self):
64+
"""
65+
executes with minimal parameters
66+
"""
67+
self._client.batch = AsyncMock(
68+
return_value=BatchResponse(task_id=42, object_ids=["foo", "bar"])
69+
)
70+
self._client.operation_index = AsyncMock(
71+
return_value=UpdatedAtResponse(task_id=21, updated_at="foobar")
72+
)
73+
self._client.get_task = AsyncMock(
74+
return_value=GetTaskResponse(status="published")
75+
)
76+
_resp = await self._client.replace_all_objects(
77+
index_name="foo", objects=[{"name": "John Doe"}]
78+
)
79+
self._client.operation_index.assert_called()
80+
self._client.batch.assert_called()
81+
self._client.operation_index.assert_called()
82+
assert _resp == {
83+
"batch_responses": [BatchResponse(task_id=42, object_ids=["foo", "bar"])],
84+
"copy_operation_response": UpdatedAtResponse(
85+
task_id=21, updated_at="foobar"
86+
),
87+
"move_operation_response": UpdatedAtResponse(
88+
task_id=21, updated_at="foobar"
89+
),
90+
}
91+
92+
async def test_replace_all_objects_1(self):
93+
"""
94+
does many calls when len(objects) > batchSize
95+
"""
96+
self._client.batch = AsyncMock(
97+
return_value=BatchResponse(task_id=42, object_ids=["foo", "bar"])
98+
)
99+
self._client.operation_index = AsyncMock(
100+
return_value=UpdatedAtResponse(task_id=21, updated_at="foobar")
101+
)
102+
self._client.get_task = AsyncMock(
103+
return_value=GetTaskResponse(status="published")
104+
)
105+
_resp = await self._client.replace_all_objects(
106+
index_name="foo",
107+
objects=[
108+
{
109+
"name": f"John Doe{i}",
110+
"objectID": f"fff2bd4d-bb17-4e21-a0c4-0a8ea5e363f2{i}",
111+
}
112+
for i in range(33)
113+
],
114+
batch_size=10,
115+
)
116+
self._client.operation_index.assert_called()
117+
self._client.batch.assert_called()
118+
self._client.operation_index.assert_called()
119+
assert _resp == {
120+
"batch_responses": [
121+
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
122+
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
123+
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
124+
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
125+
],
126+
"copy_operation_response": UpdatedAtResponse(
127+
task_id=21, updated_at="foobar"
128+
),
129+
"move_operation_response": UpdatedAtResponse(
130+
task_id=21, updated_at="foobar"
131+
),
132+
}
133+
134+
async def test_replace_all_objects_2(self):
135+
"""
136+
batchSize is 1000 by default
137+
"""
138+
self._client.batch = AsyncMock(
139+
return_value=BatchResponse(task_id=42, object_ids=["foo", "bar"])
140+
)
141+
self._client.operation_index = AsyncMock(
142+
return_value=UpdatedAtResponse(task_id=21, updated_at="foobar")
143+
)
144+
self._client.get_task = AsyncMock(
145+
return_value=GetTaskResponse(status="published")
146+
)
147+
_resp = await self._client.replace_all_objects(
148+
index_name="foo",
149+
objects=[
150+
{
151+
"name": f"John Doe{i}",
152+
"objectID": f"fff2bd4d-bb17-4e21-a0c4-0a8ea5e363f2{i}",
153+
}
154+
for i in range(1001)
155+
],
156+
)
157+
self._client.operation_index.assert_called()
158+
self._client.batch.assert_called()
159+
self._client.operation_index.assert_called()
160+
assert _resp == {
161+
"batch_responses": [
162+
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
163+
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
164+
],
165+
"copy_operation_response": UpdatedAtResponse(
166+
task_id=21, updated_at="foobar"
167+
),
168+
"move_operation_response": UpdatedAtResponse(
169+
task_id=21, updated_at="foobar"
170+
),
171+
}

templates/python/tests/requests/requests.mustache

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
from time import time
22
from os import environ
33
from json import loads
4+
from unittest.mock import AsyncMock
45
from algoliasearch.http.transporter import EchoTransporter
56
from algoliasearch.http.helpers import SecuredApiKeyRestrictions
67
from algoliasearch.{{{import}}}.client import {{#lambda.pascalcase}}{{{client}}}{{/lambda.pascalcase}}
78
from algoliasearch.{{{import}}}.config import {{#lambda.pascalcase}}{{clientPrefix}}Config{{/lambda.pascalcase}}
9+
from algoliasearch.search.models.batch_response import BatchResponse
10+
from algoliasearch.search.models.updated_at_response import UpdatedAtResponse
11+
from algoliasearch.search.models.get_task_response import GetTaskResponse
812
{{#hasE2E}}
913
from ..helpers import Helpers
1014
from dotenv import load_dotenv

0 commit comments

Comments
 (0)