Skip to content

Commit 25b84c9

Browse files
committed
feat(scala): browse helpers
1 parent f598e77 commit 25b84c9

File tree

2 files changed

+176
-3
lines changed

2 files changed

+176
-3
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package algoliasearch.extension.internal
2+
3+
import scala.concurrent.{ExecutionContext, Future, blocking}
4+
import scala.concurrent.duration.Duration
5+
6+
private[algoliasearch] object Iterable {
7+
case class Error[T](
8+
validate: T => Boolean,
9+
message: Option[T => String] = None
10+
)
11+
12+
def createIterable[T](
13+
execute: Option[T] => Future[T],
14+
validate: T => Boolean,
15+
aggregator: Option[T => Unit] = None,
16+
timeout: () => Duration = () => Duration.Zero,
17+
error: Option[Iterable.Error[T]] = None
18+
)(implicit ec: ExecutionContext): Future[T] = {
19+
def executor(previousResponse: Option[T] = None): Future[T] = {
20+
execute(previousResponse).flatMap { response =>
21+
// Call aggregator if defined
22+
aggregator.foreach(agg => agg(response))
23+
24+
// Validate the response
25+
if (validate(response)) {
26+
Future.successful(response)
27+
} else {
28+
// Check for error validation
29+
error match {
30+
case Some(err) if err.validate(response) =>
31+
err.message match {
32+
case Some(errMsg) => Future.failed(new Exception(errMsg(response)))
33+
case None => Future.failed(new Exception("An error occurred"))
34+
}
35+
case _ =>
36+
// Sleep for timeout duration, then retry
37+
blocking(Thread.sleep(timeout().toMillis))
38+
executor(Some(response))
39+
}
40+
}
41+
}
42+
}
43+
44+
executor()
45+
}
46+
}

clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala

Lines changed: 130 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package algoliasearch
33
import algoliasearch.api.SearchClient
44
import algoliasearch.config.RequestOptions
55
import algoliasearch.exception.AlgoliaApiException
6+
import algoliasearch.extension.internal.Iterable.createIterable
67
import algoliasearch.extension.internal.RetryUntil.{DEFAULT_DELAY, retryUntil}
78
import algoliasearch.search._
89

@@ -354,9 +355,6 @@ package object extension {
354355
batchSize: Int = 1000,
355356
requestOptions: Option[RequestOptions] = None
356357
)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
357-
val requests = objects.map { record =>
358-
BatchRequest(action = Action.AddObject, body = record)
359-
}
360358
val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"
361359

362360
for {
@@ -405,6 +403,12 @@ package object extension {
405403
)
406404
}
407405

406+
/** Check if an index exists.
407+
* @param indexName
408+
* The index name to check.
409+
* @return
410+
* A future containing a boolean indicating if the index exists.
411+
*/
408412
def indexExists(indexName: String)(implicit ec: ExecutionContext): Future[Boolean] = {
409413
try {
410414
client.getSettings(indexName)
@@ -415,5 +419,128 @@ package object extension {
415419

416420
Future.successful(true)
417421
}
422+
423+
/** Browse objects in an index.
424+
* @param indexName
425+
* The index name to browse.
426+
* @param browseParams
427+
* The browse parameters.
428+
* @param validate
429+
* The validation function. Default is to check if the cursor is defined.
430+
* @param aggregator
431+
* The aggregation function. This is where you can aggregate the results.
432+
* @param requestOptions
433+
* Additional request configuration.
434+
* @return
435+
* A future containing the last browse response.
436+
*/
437+
def browseObjects(
438+
indexName: String,
439+
browseParams: BrowseParamsObject,
440+
validate: BrowseResponse => Boolean = response => response.cursor.isEmpty,
441+
aggregator: BrowseResponse => Unit,
442+
requestOptions: Option[RequestOptions] = None
443+
)(implicit ec: ExecutionContext): Future[BrowseResponse] = {
444+
createIterable(
445+
execute = (previousResponse: Option[BrowseResponse]) =>
446+
client.browse(
447+
indexName,
448+
Some(
449+
browseParams.copy(
450+
hitsPerPage = previousResponse.flatMap(_.hitsPerPage.orElse(Some(1000))),
451+
cursor = previousResponse.flatMap(_.cursor)
452+
)
453+
),
454+
requestOptions
455+
),
456+
validate = validate,
457+
aggregator = Some(aggregator)
458+
)
459+
}
460+
461+
/** Browse rules in an index.
462+
* @param indexName
463+
* The index name to browse.
464+
* @param searchRulesParams
465+
* The search rules parameters.
466+
* @param validate
467+
* The validation function. Default is to check if the number of hits is less than the hits per page.
468+
* @param aggregator
469+
* The aggregation function. This is where you can aggregate the results.
470+
* @param requestOptions
471+
* Additional request configuration.
472+
* @return
473+
* A future containing the last search rules response.
474+
*/
475+
def browseRules(
476+
indexName: String,
477+
searchRulesParams: SearchRulesParams,
478+
validate: Option[SearchRulesResponse => Boolean] = None,
479+
aggregator: SearchRulesResponse => Unit,
480+
requestOptions: Option[RequestOptions] = None
481+
)(implicit ec: ExecutionContext): Future[SearchRulesResponse] = {
482+
val hitsPerPage = 1000
483+
484+
createIterable(
485+
execute = (previousResponse: Option[SearchRulesResponse]) =>
486+
client.searchRules(
487+
indexName,
488+
Some(
489+
searchRulesParams.copy(
490+
page = previousResponse.map(_.page + 1).orElse(Some(0)),
491+
hitsPerPage = Some(hitsPerPage)
492+
)
493+
),
494+
requestOptions
495+
),
496+
validate = validate.getOrElse((response: SearchRulesResponse) => response.hits.length < hitsPerPage),
497+
aggregator = Some(aggregator)
498+
)
499+
}
500+
501+
/** Browse synonyms in an index.
502+
* @param indexName
503+
* The index name to browse.
504+
* @param searchSynonymsParams
505+
* The search synonyms parameters.
506+
* @param validate
507+
* The validation function. Default is to check if the number of hits is less than the hits per page.
508+
* @param aggregator
509+
* The aggregation function. This is where you can aggregate the results.
510+
* @param requestOptions
511+
* Additional request configuration.
512+
* @return
513+
* A future containing the last search synonyms response.
514+
*/
515+
def browseSynonyms(
516+
indexName: String,
517+
searchSynonymsParams: SearchSynonymsParams,
518+
validate: Option[SearchSynonymsResponse => Boolean] = None,
519+
aggregator: SearchSynonymsResponse => Unit,
520+
requestOptions: Option[RequestOptions] = None
521+
)(implicit ec: ExecutionContext): Future[SearchSynonymsResponse] = {
522+
val hitsPerPage = 1000
523+
var page = searchSynonymsParams.page.getOrElse(0)
524+
525+
createIterable(
526+
execute = (_: Option[SearchSynonymsResponse]) =>
527+
try {
528+
client.searchSynonyms(
529+
indexName,
530+
Some(
531+
searchSynonymsParams.copy(
532+
page = Some(page),
533+
hitsPerPage = Some(hitsPerPage)
534+
)
535+
),
536+
requestOptions
537+
)
538+
} finally {
539+
page += 1
540+
},
541+
validate = validate.getOrElse((response: SearchSynonymsResponse) => response.hits.length < hitsPerPage),
542+
aggregator = Some(aggregator)
543+
)
544+
}
418545
}
419546
}

0 commit comments

Comments
 (0)