Skip to content

RetryHelpers trait to implement non-blocking retries #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
40b4b80
RetryHelpers trait to allow non-blocking retries (#15)
phelps-sg Jun 9, 2023
282b3f8
provide materializer in trait to reduce boilerplate, and specify defa…
phelps-sg Jun 9, 2023
af5ebe4
OpenAIScalaClientException can be pattern matched against Retryable(_…
phelps-sg Jun 10, 2023
df1fd22
fix type parameter
phelps-sg Jun 10, 2023
9cb877b
scalatest and mockito deps
phelps-sg Jun 10, 2023
e6531d0
basic unit test for RetryHelpers, and fixed errors in delay method
phelps-sg Jun 10, 2023
d4787fe
new test for non-retryable failures (#16)
phelps-sg Jun 10, 2023
e342fb2
factored out duplicated constant
phelps-sg Jun 10, 2023
b8e316b
deprecate OpenAIRetryServiceAdapter
phelps-sg Jun 10, 2023
7bea15e
use correct exponential backoff interval as a function of number of a…
phelps-sg Jun 10, 2023
c302744
formatting
phelps-sg Jun 10, 2023
72f8397
new OpenAIRetryServiceAdapter in openai-client package
phelps-sg Jun 10, 2023
6ede80b
TODO on separate classes for #16
phelps-sg Jun 10, 2023
d1daf0a
use ScalaFutures patterns to await futures
phelps-sg Jun 10, 2023
1a15aeb
use Scala Int not Integer
phelps-sg Jun 11, 2023
c261b32
factor out common code and use async helpers for testing exception to…
phelps-sg Jun 11, 2023
cb5b547
fix incorrect attempts val in testWithException
phelps-sg Jun 11, 2023
f0385f0
correct test logic for "not retry on success"
phelps-sg Jun 11, 2023
60332d7
factor out dup code into verifyNumAttempts
phelps-sg Jun 11, 2023
9a7b07c
remove attempts param for testWithException since it can be derived f…
phelps-sg Jun 11, 2023
9788214
remove unused attempts local val
phelps-sg Jun 11, 2023
5b42350
remove dep on RetrySupport as no longer required
phelps-sg Jun 11, 2023
d1d0435
renamed settings attributes to fit with math expression
phelps-sg Jun 11, 2023
945c9dc
protected access for delay and retry
phelps-sg Jun 11, 2023
6cab701
Revert "protected access for delay and retry"
phelps-sg Jun 11, 2023
a7780c2
fix incorrect test package name
phelps-sg Jun 11, 2023
5af7e34
protected access for delay and retry
phelps-sg Jun 11, 2023
3ea5ae7
Revert "protected access for delay and retry"
phelps-sg Jun 11, 2023
e5ddc04
private[package] access for delay in and retry methods in implicit class
phelps-sg Jun 11, 2023
3bd33e7
new test to fail if retries exhausted, and reduced backoff delay for …
phelps-sg Jun 12, 2023
31442b2
move fixtures to bottom of class for readability
phelps-sg Jun 12, 2023
c21f843
allow easy configuration of a constant retry interval
phelps-sg Jun 15, 2023
2eff890
companion object for RetrySettings to allow constant interval to be s…
phelps-sg Jun 15, 2023
f150eb0
use correct base of zero for constant interval
phelps-sg Jun 15, 2023
a1f700d
move non-implicit methods to companion object for easier testing
phelps-sg Jun 15, 2023
7174cb5
additional tests for delay()
phelps-sg Jun 15, 2023
0aa171b
update README with new retry usage
phelps-sg Jun 15, 2023
82603af
merge upstream and resolve conflicts
phelps-sg Jun 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ This extension of the standard chat completion is currently supported by the fol
- `gpt-3.5-turbo-0613` (default), `gpt-3.5-turbo-16k-0613`, `gpt-4-0613`, and `gpt-4-32k-0613`.


**✔️ Important Note**: After you are done using the service, you should close it by calling `service.close`. Otherwise, the underlying resources/threads won't be released.
**✔️ Important Note**: After you are done using the service, you should close it by calling (🔥 new) `service.close`. Otherwise, the underlying resources/threads won't be released.

**III. Using multiple services (🔥 new)**

Expand Down Expand Up @@ -276,17 +276,48 @@ This extension of the standard chat completion is currently supported by the fol
}
```

- Create completion and retry on transient errors (e.g. rate limit error)
```scala
import akka.actor.{ActorSystem, Scheduler}
import io.cequence.openaiscala.RetryHelpers
import io.cequence.openaiscala.RetryHelpers.RetrySettings
import io.cequence.openaiscala.domain.{ChatRole, MessageSpec}
import io.cequence.openaiscala.service.{OpenAIService, OpenAIServiceFactory}

import javax.inject.Inject
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}

class MyCompletionService @Inject() (
val actorSystem: ActorSystem,
implicit val ec: ExecutionContext,
implicit val scheduler: Scheduler
)(val apiKey: String)
extends RetryHelpers {
val service: OpenAIService = OpenAIServiceFactory(apiKey)
implicit val retrySettings: RetrySettings =
RetrySettings(interval = 10.seconds)

def ask(prompt: String): Future[String] =
for {
completion <- service
.createChatCompletion(
List(MessageSpec(ChatRole.User, prompt))
)
.retryOnFailure
} yield completion.choices.head.message.content
}
```

- Retries with `OpenAIRetryServiceAdapter`

```scala
val serviceAux = ... // your service

implicit val retrySettings: RetrySettings =
RetrySettings(maxAttempts = 10).constantInterval(10.seconds)
// wrap it with the retry adapter
val service = OpenAIRetryServiceAdapter(
serviceAux,
maxAttempts = 10,
sleepOnFailureMs = Some(1000) // 1 second
)
val service = OpenAIRetryServiceAdapter(serviceAux)

service.listModels.map { models =>
models.foreach(println)
Expand Down
12 changes: 12 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,35 @@ import sbt.Keys.test
val scala212 = "2.12.18"
val scala213 = "2.13.11"
val scala3 = "3.2.2"
val AkkaVersion = "2.6.1"

ThisBuild / organization := "io.cequence"
ThisBuild / scalaVersion := scala212
ThisBuild / version := "0.4.0"
ThisBuild / isSnapshot := false

lazy val commonSettings = Seq(
libraryDependencies += "org.scalactic" %% "scalactic" % "3.2.16",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.16" % Test,
libraryDependencies += "org.mockito" %% "mockito-scala-scalatest" % "1.17.14" % Test,
libraryDependencies += "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test
)

lazy val core = (project in file("openai-core"))
.settings(commonSettings: _*)

lazy val client = (project in file("openai-client"))
.settings(commonSettings: _*)
.dependsOn(core)
.aggregate(core)

lazy val client_stream = (project in file("openai-client-stream"))
.settings(commonSettings: _*)
.dependsOn(client)
.aggregate(client)

lazy val guice = (project in file("openai-guice"))
.settings(commonSettings: _*)
.dependsOn(client)
.aggregate(client_stream)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.cequence.openaiscala

import akka.actor.{ActorSystem, Scheduler}
import akka.pattern.after
import akka.stream.Materializer
import io.cequence.openaiscala.RetryHelpers.{RetrySettings, retry}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

object RetryHelpers {
private[openaiscala] def delay(
n: Integer
)(implicit retrySettings: RetrySettings): FiniteDuration =
FiniteDuration(
scala.math.round(
retrySettings.delayOffset.length + scala.math.pow(
retrySettings.delayBase,
n.doubleValue()
)
),
retrySettings.delayOffset.unit
)

private[openaiscala] def retry[T](
attempt: () => Future[T],
attempts: Int
)(implicit
ec: ExecutionContext,
scheduler: Scheduler,
retrySettings: RetrySettings
): Future[T] = {
try {
if (attempts > 0) {
attempt().recoverWith { case Retryable(_) =>
after(delay(attempts), scheduler) {
retry(attempt, attempts - 1)
}
}
} else {
attempt()
}
} catch {
case NonFatal(error) => Future.failed(error)
}
}

final case class RetrySettings(
maxRetries: Int = 5,
delayOffset: FiniteDuration = 2.seconds,
delayBase: Double = 2
) {
def constantInterval(interval: FiniteDuration): RetrySettings =
copy(delayBase = 0).copy(delayOffset = interval)
}

object RetrySettings {
def apply(interval: FiniteDuration): RetrySettings =
RetrySettings().constantInterval(
interval
)

}

}

trait RetryHelpers {

def actorSystem: ActorSystem
implicit val materializer: Materializer = Materializer(actorSystem)

implicit class FutureWithRetry[T](f: Future[T]) {

def retryOnFailure(implicit
retrySettings: RetrySettings,
ec: ExecutionContext,
scheduler: Scheduler
): Future[T] = {
retry(() => f, retrySettings.maxRetries)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.cequence.openaiscala.service

import akka.actor.{ActorSystem, Scheduler}
import io.cequence.openaiscala.RetryHelpers
import io.cequence.openaiscala.RetryHelpers.RetrySettings

import scala.concurrent.{ExecutionContext, Future}

private class OpenAIRetryServiceAdapter(
underlying: OpenAIService,
val actorSystem: ActorSystem,
implicit val ec: ExecutionContext,
implicit val retrySettings: RetrySettings,
implicit val scheduler: Scheduler
) extends OpenAIServiceWrapper
with RetryHelpers {

override def close: Unit =
underlying.close

override protected def wrap[T](
fun: OpenAIService => Future[T]
): Future[T] = {
fun(underlying).retryOnFailure
}
}

object OpenAIRetryServiceAdapter {
def apply(underlying: OpenAIService)(implicit
ec: ExecutionContext,
retrySettings: RetrySettings,
scheduler: Scheduler,
actorSystem: ActorSystem
): OpenAIService =
new OpenAIRetryServiceAdapter(
underlying,
actorSystem,
ec,
retrySettings,
scheduler
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package io.cequence.openaiscala

import akka.actor.{ActorSystem, Scheduler}
import akka.testkit.TestKit
import io.cequence.openaiscala.RetryHelpers.{RetrySettings, delay, retry}
import org.mockito.scalatest.MockitoSugar
import org.scalatest.RecoverMethods._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{BeforeAndAfterAll, Succeeded}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}

class RetryHelpersSpec
extends TestKit(ActorSystem("RetryHelpersSpec"))
with AnyWordSpecLike
with Matchers
with BeforeAndAfterAll
with MockitoSugar
with ScalaFutures
with RetryHelpers {
val successfulResult = 42

implicit val patience: PatienceConfig = PatienceConfig(timeout = 10.seconds)

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

"RetrySettings" should {
"allow easy configuration of a constant interval" in {
val interval = 10.seconds
val result = RetrySettings(interval)
result.delayBase shouldBe 0
result.delayOffset shouldBe interval
}
}

"RetryHelpers" should {

"retry when encountering a retryable failure" in {
val attempts = 2
val ex = new OpenAIScalaClientTimeoutException("retryable test exception")
testWithException(ex) { (mockRetryable, result) =>
result.futureValue shouldBe successfulResult
verifyNumAttempts(n = attempts, result, mockRetryable)
}
}

"not retry when encountering a non-retryable failure" in {
val ex = new OpenAIScalaClientUnknownHostException(
"non retryable test exception"
)
testWithException(ex) { (mockRetryable, result) =>
val f = for {
_ <- recoverToExceptionIf[OpenAIScalaClientUnknownHostException](
result
)
} yield mockRetryable
verifyNumAttempts(n = 1, f, mockRetryable)
}
}

"not retry on success" in {
testWithResults(attempts = 2, Seq(Future.successful(successfulResult))) {
(mockRetryable, result) =>
result.futureValue shouldBe successfulResult
verifyNumAttempts(n = 1, result, mockRetryable)
}
}

"fail when max retries exceeded" in {
val ex = Future.failed {
new OpenAIScalaClientTimeoutException("retryable exception")
}
testWithResults(
attempts = 2,
Seq(ex, ex, ex, Future.successful(successfulResult))
) { (_, result) =>
recoverToSucceededIf[OpenAIScalaClientTimeoutException](
result
).futureValue shouldBe Succeeded
}
}

"compute the correct delay when using constant interval" in {
val interval = 10.seconds
val settings = RetrySettings(interval)
delay(1)(settings) shouldBe interval
delay(5)(settings) shouldBe interval
}

"compute the correct delay when using strictly positive base" in {
val settings = RetrySettings(
maxRetries = 5,
delayOffset = 2.seconds,
delayBase = 2
)
delay(1)(settings) shouldBe 4.seconds
delay(2)(settings) shouldBe 6.seconds
delay(3)(settings) shouldBe 10.seconds
}

}

implicit val scheduler: Scheduler = actorSystem.scheduler

override def patienceConfig: PatienceConfig = patience
implicit val retrySettings: RetrySettings = RetrySettings(
maxRetries = 5,
delayOffset = 0.seconds,
delayBase = 1
)

def testWithException(ex: OpenAIScalaClientException)(
test: (Retryable, Future[Int]) => Unit
): Unit = {
val results = Seq(Future.failed(ex), Future.successful(successfulResult))
testWithResults(results.length, results)(test)
}

def testWithResults(attempts: Int, results: Seq[Future[Int]])(
test: (Retryable, Future[Int]) => Unit
): Unit = {
val future = Promise[Int]().future
val mockRetryable = mock[Retryable]
when(mockRetryable.attempt())
.thenReturn(results.head, results.takeRight(results.length - 1): _*)
val result = retry(() => mockRetryable.attempt(), attempts)
test(mockRetryable, result)
}

def verifyNumAttempts[T](n: Int, f: Future[T], mock: Retryable): Unit =
whenReady(f) { _ =>
verify(mock, times(n)).attempt()
}

override def actorSystem: ActorSystem = system
}

trait Retryable {
def attempt(): Future[Int]
}
Loading