Skip to content

feat(scala): browse helpers #4062

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package algoliasearch.extension.internal

import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.concurrent.duration.Duration

private[algoliasearch] object Iterable {
case class Error[T](
validate: T => Boolean,
message: Option[T => String] = None
)

def createIterable[T](
execute: Option[T] => Future[T],
validate: T => Boolean,
aggregator: Option[T => Unit] = None,
timeout: () => Duration = () => Duration.Zero,
error: Option[Iterable.Error[T]] = None
)(implicit ec: ExecutionContext): Future[T] = {
def executor(previousResponse: Option[T] = None): Future[T] = {
execute(previousResponse).flatMap { response =>
// Call aggregator if defined
aggregator.foreach(agg => agg(response))

// Validate the response
if (validate(response)) {
Future.successful(response)
} else {
// Check for error validation
error match {
case Some(err) if err.validate(response) =>
err.message match {
case Some(errMsg) => Future.failed(new Exception(errMsg(response)))
case None => Future.failed(new Exception("An error occurred"))
}
case _ =>
// Sleep for timeout duration, then retry
blocking(Thread.sleep(timeout().toMillis))
executor(Some(response))
}
}
}
}

executor()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package algoliasearch
import algoliasearch.api.SearchClient
import algoliasearch.config.RequestOptions
import algoliasearch.exception.AlgoliaApiException
import algoliasearch.extension.internal.Iterable.createIterable
import algoliasearch.extension.internal.RetryUntil.{DEFAULT_DELAY, retryUntil}
import algoliasearch.search._

Expand Down Expand Up @@ -354,9 +355,6 @@ package object extension {
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
val requests = objects.map { record =>
BatchRequest(action = Action.AddObject, body = record)
}
val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"

for {
Expand Down Expand Up @@ -405,6 +403,12 @@ package object extension {
)
}

/** Check if an index exists.
* @param indexName
* The index name to check.
* @return
* A future containing a boolean indicating if the index exists.
*/
def indexExists(indexName: String)(implicit ec: ExecutionContext): Future[Boolean] = {
try {
client.getSettings(indexName)
Expand All @@ -415,5 +419,128 @@ package object extension {

Future.successful(true)
}

/** Browse objects in an index.
* @param indexName
* The index name to browse.
* @param browseParams
* The browse parameters.
* @param validate
* The validation function. Default is to check if the cursor is defined.
* @param aggregator
* The aggregation function. This is where you can aggregate the results.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the last browse response.
*/
def browseObjects(
indexName: String,
browseParams: BrowseParamsObject,
validate: BrowseResponse => Boolean = response => response.cursor.isEmpty,
aggregator: BrowseResponse => Unit,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[BrowseResponse] = {
createIterable(
execute = (previousResponse: Option[BrowseResponse]) =>
client.browse(
indexName,
Some(
browseParams.copy(
hitsPerPage = previousResponse.flatMap(_.hitsPerPage.orElse(Some(1000))),
cursor = previousResponse.flatMap(_.cursor)
)
),
requestOptions
),
validate = validate,
aggregator = Some(aggregator)
)
}

/** Browse rules in an index.
* @param indexName
* The index name to browse.
* @param searchRulesParams
* The search rules parameters.
* @param validate
* The validation function. Default is to check if the number of hits is less than the hits per page.
* @param aggregator
* The aggregation function. This is where you can aggregate the results.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the last search rules response.
*/
def browseRules(
indexName: String,
searchRulesParams: SearchRulesParams,
validate: Option[SearchRulesResponse => Boolean] = None,
aggregator: SearchRulesResponse => Unit,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[SearchRulesResponse] = {
val hitsPerPage = 1000

createIterable(
execute = (previousResponse: Option[SearchRulesResponse]) =>
client.searchRules(
indexName,
Some(
searchRulesParams.copy(
page = previousResponse.map(_.page + 1).orElse(Some(0)),
hitsPerPage = Some(hitsPerPage)
)
),
requestOptions
),
validate = validate.getOrElse((response: SearchRulesResponse) => response.hits.length < hitsPerPage),
aggregator = Some(aggregator)
)
}

/** Browse synonyms in an index.
* @param indexName
* The index name to browse.
* @param searchSynonymsParams
* The search synonyms parameters.
* @param validate
* The validation function. Default is to check if the number of hits is less than the hits per page.
* @param aggregator
* The aggregation function. This is where you can aggregate the results.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the last search synonyms response.
*/
def browseSynonyms(
indexName: String,
searchSynonymsParams: SearchSynonymsParams,
validate: Option[SearchSynonymsResponse => Boolean] = None,
aggregator: SearchSynonymsResponse => Unit,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[SearchSynonymsResponse] = {
val hitsPerPage = 1000
var page = searchSynonymsParams.page.getOrElse(0)

createIterable(
execute = (_: Option[SearchSynonymsResponse]) =>
try {
client.searchSynonyms(
indexName,
Some(
searchSynonymsParams.copy(
page = Some(page),
hitsPerPage = Some(hitsPerPage)
)
),
requestOptions
)
} finally {
page += 1
},
validate = validate.getOrElse((response: SearchSynonymsResponse) => response.hits.length < hitsPerPage),
aggregator = Some(aggregator)
)
}
}
}
Loading