Skip to content

Commit 6876495

Browse files
committed
feat(scala): browse helpers
1 parent f29dfa7 commit 6876495

File tree

2 files changed

+147
-3
lines changed

2 files changed

+147
-3
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package algoliasearch.extension.internal
2+
3+
import scala.concurrent.{ExecutionContext, Future, blocking}
4+
import scala.concurrent.duration.Duration
5+
6+
private[algoliasearch] object Iterable {
7+
case class Error[T](
8+
validate: T => Boolean,
9+
message: Option[T => String] = None
10+
)
11+
12+
def createIterable[T](
13+
execute: Option[T] => Future[T],
14+
validate: T => Boolean,
15+
aggregator: Option[T => Unit] = None,
16+
timeout: () => Duration = () => Duration.Zero,
17+
error: Option[Iterable.Error[T]] = None
18+
)(implicit ec: ExecutionContext): Future[T] = {
19+
def executor(previousResponse: Option[T] = None): Future[T] = {
20+
execute(previousResponse).flatMap { response =>
21+
// Call aggregator if defined
22+
aggregator.foreach(agg => agg(response))
23+
24+
// Validate the response
25+
if (validate(response)) {
26+
Future.successful(response)
27+
} else {
28+
// Check for error validation
29+
error match {
30+
case Some(err) if err.validate(response) =>
31+
err.message match {
32+
case Some(errMsg) => Future.failed(new Exception(errMsg(response)))
33+
case None => Future.failed(new Exception("An error occurred"))
34+
}
35+
case _ =>
36+
// Sleep for timeout duration, then retry
37+
blocking(Thread.sleep(timeout().toMillis))
38+
executor(Some(response))
39+
}
40+
}
41+
}
42+
}
43+
44+
executor()
45+
}
46+
}

clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package algoliasearch
33
import algoliasearch.api.SearchClient
44
import algoliasearch.config.RequestOptions
55
import algoliasearch.exception.AlgoliaApiException
6+
import algoliasearch.extension.internal.Iterable.createIterable
67
import algoliasearch.extension.internal.RetryUntil.{DEFAULT_DELAY, retryUntil}
78
import algoliasearch.search._
89

@@ -354,9 +355,6 @@ package object extension {
354355
batchSize: Int = 1000,
355356
requestOptions: Option[RequestOptions] = None
356357
)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
357-
val requests = objects.map { record =>
358-
BatchRequest(action = Action.AddObject, body = record)
359-
}
360358
val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"
361359

362360
for {
@@ -405,6 +403,11 @@ package object extension {
405403
)
406404
}
407405

406+
/**
407+
* Check if an index exists.
408+
* @param indexName The index name to check.
409+
* @return A future containing a boolean indicating if the index exists.
410+
*/
408411
def indexExists(indexName: String)(implicit ec: ExecutionContext): Future[Boolean] = {
409412
try {
410413
client.getSettings(indexName)
@@ -415,5 +418,100 @@ package object extension {
415418

416419
Future.successful(true)
417420
}
421+
422+
/**
423+
* Browse objects in an index.
424+
* @param indexName The index name to browse.
425+
* @param browseParams The browse parameters.
426+
* @param validate The validation function. Default is to check if the cursor is defined.
427+
* @param aggregator The aggregation function. This is where you can aggregate the results.
428+
* @param requestOptions Additional request configuration.
429+
* @return A future containing the last browse response.
430+
*/
431+
def browseObjects(
432+
indexName: String,
433+
browseParams: BrowseParamsObject,
434+
validate: BrowseResponse => Boolean = response => response.cursor.isDefined,
435+
aggregator: BrowseResponse => Unit,
436+
requestOptions: Option[RequestOptions] = None
437+
)(implicit ec: ExecutionContext): Future[BrowseResponse] = {
438+
createIterable(
439+
execute = (previousResponse: Option[BrowseResponse]) =>
440+
client.browse(indexName, Some(browseParams.copy(cursor = previousResponse.flatMap(_.cursor))), requestOptions),
441+
validate = validate,
442+
aggregator = Some(aggregator),
443+
)
444+
}
445+
446+
/**
447+
* Browse rules in an index.
448+
* @param indexName The index name to browse.
449+
* @param searchRulesParams The search rules parameters.
450+
* @param validate The validation function. Default is to check if the number of hits is less than the hits per page.
451+
* @param aggregator The aggregation function. This is where you can aggregate the results.
452+
* @param requestOptions Additional request configuration.
453+
* @return A future containing the last search rules response.
454+
*/
455+
def browseRules(
456+
indexName: String,
457+
searchRulesParams: SearchRulesParams,
458+
validate: Option[SearchRulesResponse => Boolean] = None,
459+
aggregator: SearchRulesResponse => Unit,
460+
requestOptions: Option[RequestOptions] = None
461+
)(implicit ec: ExecutionContext): Future[SearchRulesResponse] = {
462+
val hitsPerPage = 1000
463+
464+
createIterable(
465+
execute = (previousResponse: Option[SearchRulesResponse]) =>
466+
client.searchRules(
467+
indexName,
468+
Some(searchRulesParams.copy(
469+
page = previousResponse.map(_.page + 1).orElse(Some(0)),
470+
hitsPerPage = Some(hitsPerPage),
471+
)),
472+
requestOptions
473+
),
474+
validate = validate.getOrElse((response: SearchRulesResponse) => response.hits.length < hitsPerPage),
475+
aggregator = Some(aggregator),
476+
)
477+
}
478+
479+
/**
480+
* Browse synonyms in an index.
481+
* @param indexName The index name to browse.
482+
* @param searchSynonymsParams The search synonyms parameters.
483+
* @param validate The validation function. Default is to check if the number of hits is less than the hits per page.
484+
* @param aggregator The aggregation function. This is where you can aggregate the results.
485+
* @param requestOptions Additional request configuration.
486+
* @return A future containing the last search synonyms response.
487+
*/
488+
def browseSynonyms(
489+
indexName: String,
490+
searchSynonymsParams: SearchSynonymsParams,
491+
validate: Option[SearchSynonymsResponse => Boolean] = None,
492+
aggregator: SearchSynonymsResponse => Unit,
493+
requestOptions: Option[RequestOptions] = None
494+
)(implicit ec: ExecutionContext): Future[SearchSynonymsResponse] = {
495+
val hitsPerPage = 1000
496+
var page = searchSynonymsParams.page.getOrElse(0)
497+
498+
createIterable(
499+
execute = (_: Option[SearchSynonymsResponse]) =>
500+
try {
501+
client.searchSynonyms(
502+
indexName,
503+
Some(searchSynonymsParams.copy(
504+
page = Some(page),
505+
hitsPerPage = Some(hitsPerPage),
506+
)),
507+
requestOptions
508+
)
509+
} finally {
510+
page += 1
511+
},
512+
validate = validate.getOrElse((response: SearchSynonymsResponse) => response.hits.length < hitsPerPage),
513+
aggregator = Some(aggregator),
514+
)
515+
}
418516
}
419517
}

0 commit comments

Comments
 (0)