Skip to content

feat(clients): allow batch size on objects helper [skip-bc] #4172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,26 @@ public partial interface ISearchClient
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
/// <typeparam name="T"></typeparam>
Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
/// <inheritdoc cref="SaveObjectsAsync{T}(string, IEnumerable{T}, RequestOptions, CancellationToken)"/>
List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;

/// <summary>
/// Helper: Deletes every records for the given objectIDs. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objectIDs in it.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objectIDs">The list of `objectIDs` to remove from the given Algolia `indexName`.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default);
Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default);
/// <inheritdoc cref="DeleteObjectsAsync(string, IEnumerable{String}, RequestOptions, CancellationToken)"/>
List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default);
List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default);

/// <summary>
/// Helper: Replaces object content of all the given objects according to their respective `objectID` field. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
Expand All @@ -193,11 +195,12 @@ public partial interface ISearchClient
/// <param name="objects">The list of `objects` to update in the given Algolia `indexName`.</param>
/// <param name="createIfNotExists">To be provided if non-existing objects are passed, otherwise, the call will fail.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
/// <inheritdoc cref="PartialUpdateObjectsAsync{T}(string, IEnumerable{T}, bool, RequestOptions, CancellationToken)"/>
List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;

/// <summary>
/// Helper: Check if an index exists.
Expand Down Expand Up @@ -568,43 +571,45 @@ public List<BatchResponse> ChunkedBatch<T>(string indexName, IEnumerable<T> obje
/// <inheritdoc/>
public async Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects,
bool waitForTasks = false,
int batchSize = 1000,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, waitForTasks, batchSize, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null,
public List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class =>
AsyncHelper.RunSync(() => SaveObjectsAsync(indexName, objects, waitForTasks, options, cancellationToken));
AsyncHelper.RunSync(() => SaveObjectsAsync(indexName, objects, waitForTasks, batchSize, options, cancellationToken));

/// <inheritdoc/>
public async Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs,
bool waitForTasks = false,
int batchSize = 1000,
RequestOptions options = null,
CancellationToken cancellationToken = default)
{
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, waitForTasks, batchSize, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null,
public List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null,
CancellationToken cancellationToken = default) =>
AsyncHelper.RunSync(() => DeleteObjectsAsync(indexName, objectIDs, waitForTasks, options, cancellationToken));
AsyncHelper.RunSync(() => DeleteObjectsAsync(indexName, objectIDs, waitForTasks, batchSize, options, cancellationToken));

/// <inheritdoc/>
public async Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false,
public async Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, int batchSize = 1000,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, waitForTasks, batchSize, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false,
public List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, int batchSize = 1000,
RequestOptions options = null, CancellationToken cancellationToken = default) where T : class =>
AsyncHelper.RunSync(() => PartialUpdateObjectsAsync(indexName, objects, createIfNotExists, waitForTasks, options, cancellationToken));
AsyncHelper.RunSync(() => PartialUpdateObjectsAsync(indexName, objects, createIfNotExists, waitForTasks, batchSize, options, cancellationToken));

private static async Task<List<TU>> CreateIterable<TU>(Func<TU, Task<TU>> executeQuery,
Func<TU, bool> stopCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ public suspend fun SearchClient.chunkedBatch(
* @param indexName The index in which to perform the request.
* @param objects The list of objects to index.
* @param waitForTask If true, wait for the task to complete.
* @param batchSize The size of the batch. Default is 1000.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
Expand All @@ -379,14 +380,15 @@ public suspend fun SearchClient.saveObjects(
indexName: String,
objects: List<JsonObject>,
waitForTask: Boolean = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = Action.AddObject,
waitForTask = waitForTask,
batchSize = 1000,
batchSize = batchSize,
requestOptions = requestOptions,
)
}
Expand All @@ -397,6 +399,7 @@ public suspend fun SearchClient.saveObjects(
* @param indexName The index in which to perform the request.
* @param objectIDs The list of objectIDs to delete from the index.
* @param waitForTask If true, wait for the task to complete.
* @param batchSize The size of the batch. Default is 1000.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
Expand All @@ -405,14 +408,15 @@ public suspend fun SearchClient.deleteObjects(
indexName: String,
objectIDs: List<String>,
waitForTask: Boolean = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objectIDs.map { id -> JsonObject(mapOf("objectID" to Json.encodeToJsonElement(id))) },
action = Action.DeleteObject,
waitForTask = waitForTask,
batchSize = 1000,
batchSize = batchSize,
requestOptions = requestOptions,
)
}
Expand All @@ -424,6 +428,7 @@ public suspend fun SearchClient.deleteObjects(
* @param objects The list of objects to update in the index.
* @param createIfNotExists To be provided if non-existing objects are passed, otherwise, the call will fail..
* @param waitForTask If true, wait for the task to complete.
* @param batchSize The size of the batch. Default is 1000.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
Expand All @@ -433,14 +438,15 @@ public suspend fun SearchClient.partialUpdateObjects(
objects: List<JsonObject>,
createIfNotExists: Boolean,
waitForTask: Boolean = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate,
waitForTask = waitForTask,
batchSize = 1000,
batchSize = batchSize,
requestOptions = requestOptions,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ package object extension {
* The list of objects to save.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -263,9 +265,10 @@ package object extension {
indexName: String,
objects: Seq[Any],
waitForTasks: Boolean = false,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(indexName, objects, Action.AddObject, waitForTasks, 1000, requestOptions)
chunkedBatch(indexName, objects, Action.AddObject, waitForTasks, batchSize, requestOptions)
}

/** Helper: Deletes every objects for the given objectIDs. The `chunkedBatch` helper is used under the hood, which
Expand All @@ -277,6 +280,8 @@ package object extension {
* The list of objectIDs to delete.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -286,14 +291,15 @@ package object extension {
indexName: String,
objectIDs: Seq[String],
waitForTasks: Boolean = false,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(
indexName,
objectIDs.map(id => new { val objectID: String = id }),
Action.DeleteObject,
waitForTasks,
1000,
batchSize,
requestOptions
)
}
Expand All @@ -309,6 +315,8 @@ package object extension {
* To be provided if non-existing objects are passed, otherwise, the call will fail.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -319,14 +327,15 @@ package object extension {
objects: Seq[Any],
createIfNotExists: Boolean = false,
waitForTasks: Boolean = false,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(
indexName,
objects,
if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate,
waitForTasks,
1000,
batchSize,
requestOptions
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,20 +468,22 @@ public extension SearchClient {
/// - parameter indexName: The name of the index where to save the objects
/// - parameter objects: The new objects
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter batchSize: The maximum number of objects to include in a batch
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func saveObjects(
indexName: String,
objects: [some Encodable],
waitForTasks: Bool = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: .addObject,
waitForTasks: waitForTasks,
batchSize: 1000,
batchSize: batchSize,
requestOptions: requestOptions
)
}
Expand All @@ -491,20 +493,22 @@ public extension SearchClient {
/// - parameter indexName: The name of the index to delete objectIDs from
/// - parameter objectIDs: The objectIDs to delete
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter batchSize: The maximum number of objects to include in a batch
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func deleteObjects(
indexName: String,
objectIDs: [String],
waitForTasks: Bool = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objectIDs.map { AnyCodable(["objectID": $0]) },
action: .deleteObject,
waitForTasks: waitForTasks,
batchSize: 1000,
batchSize: batchSize,
requestOptions: requestOptions
)
}
Expand All @@ -516,21 +520,23 @@ public extension SearchClient {
/// - parameter createIfNotExists: To be provided if non-existing objects are passed, otherwise, the call will
/// fail..
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter batchSize: The maximum number of objects to include in a batch
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func partialUpdateObjects(
indexName: String,
objects: [some Encodable],
createIfNotExists: Bool = false,
waitForTasks: Bool = false,
batchSize: Int = 1000,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: createIfNotExists ? .partialUpdateObject : .partialUpdateObjectNoCreate,
waitForTasks: waitForTasks,
batchSize: 1000,
batchSize: batchSize,
requestOptions: requestOptions
)
}
Expand Down
6 changes: 6 additions & 0 deletions specs/search/helpers/deleteObjects.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ method:
required: false
schema:
type: boolean
- in: query
name: batchSize
description: The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
required: false
schema:
type: integer
- in: query
name: requestOptions
description: The request options to pass to the `batch` method.
Expand Down
Loading