Skip to content

Commit 02c8f0d

Browse files
committed
Parallel take first adapter
1 parent dc24f73 commit 02c8f0d

File tree

4 files changed

+41
-2
lines changed

4 files changed

+41
-2
lines changed

openai-core/src/main/scala/io/cequence/openaiscala/service/adapter/ChatCompletionServiceAdapter.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,4 @@ private class ChatCompletionServiceAdapter[S <: CloseableService](
3131
chatCompletionService.close()
3232
underlying.close()
3333
}
34-
3534
}

openai-core/src/main/scala/io/cequence/openaiscala/service/adapter/MultiServiceAdapter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,4 @@ private class RandomOrderAdapter[+S <: CloseableService](
3636
val underlyings: Seq[S]
3737
) extends MultiServiceAdapter[S] {
3838
protected def calcIndex: Int = Random.nextInt(count)
39-
}
39+
}

openai-core/src/main/scala/io/cequence/openaiscala/service/adapter/OpenAIServiceAdapters.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.cequence.openaiscala.service.adapter
22

33
import akka.actor.Scheduler
4+
import akka.stream.Materializer
45
import io.cequence.openaiscala.RetryHelpers.RetrySettings
56
import io.cequence.openaiscala.domain.BaseMessage
67
import io.cequence.openaiscala.domain.settings.CreateChatCompletionSettings
@@ -34,6 +35,12 @@ trait OpenAIServiceAdapters[S <: CloseableService] {
3435
): S =
3536
wrapAndDelegate(new RandomOrderAdapter(underlyings))
3637

38+
def parallelTakeFirst(
39+
underlyings: S*)(
40+
implicit materializer: Materializer
41+
): S =
42+
wrapAndDelegate(new ParallelTakeFirstAdapter(underlyings))
43+
3744
def retry(
3845
underlying: S,
3946
log: Option[String => Unit] = None
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.cequence.openaiscala.service.adapter
2+
3+
import akka.stream.Materializer
4+
import akka.stream.scaladsl.{Sink, Source}
5+
import io.cequence.wsclient.service.CloseableService
6+
import org.slf4j.LoggerFactory
7+
8+
import scala.concurrent.Future
9+
10+
private class ParallelTakeFirstAdapter[+S <: CloseableService](
11+
underlyings: Seq[S]
12+
)(
13+
implicit materializer: Materializer
14+
) extends ServiceWrapper[S]
15+
with CloseableService {
16+
17+
private val logger = LoggerFactory.getLogger(getClass)
18+
19+
override protected[adapter] def wrap[T](
20+
fun: S => Future[T]
21+
): Future[T] = {
22+
logger.debug(s"Running parallel/redundant processing with ${underlyings.size} services.")
23+
24+
val sources = Source
25+
.fromIterator(() => underlyings.toIterator)
26+
.mapAsyncUnordered(underlyings.size)(fun)
27+
28+
sources.runWith(Sink.head)
29+
}
30+
31+
override def close(): Unit =
32+
underlyings.foreach(_.close())
33+
}

0 commit comments

Comments
 (0)