Skip to content

Commit 35014b5

Browse files
authored
fix: Set default value for thread keep-alive to 1 minute (#594)
feat: Make blocking thread pool executor configurable Closes #593
1 parent 8da0032 commit 35014b5

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

jvm/src/main/scala/com/avast/sst/jvm/execution/ExecutorModule.scala

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,56 +53,71 @@ class ExecutorModule[F[_]: Sync](
5353

5454
object ExecutorModule {
5555

56+
private final val DefaultBlockingExecutorConfig = ThreadPoolExecutorConfig(0, Int.MaxValue, allowCoreThreadTimeout = true)
57+
5658
/** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with default callback executor and extra [[cats.effect.Blocker]] executor
5759
* for blocking operations.
5860
*/
5961
def makeDefault[F[_]: Sync]: Resource[F, ExecutorModule[F]] = {
6062
for {
6163
numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors))
6264
coreSize = numOfCpus * 2
63-
executor <- makeThreadPoolExecutor(ThreadPoolExecutorConfig(coreSize, coreSize), toolkitThreadFactory, new LinkedBlockingQueue)
65+
executor <- makeThreadPoolExecutor(
66+
ThreadPoolExecutorConfig(coreSize, coreSize, allowCoreThreadTimeout = true),
67+
toolkitThreadFactory,
68+
new LinkedBlockingQueue
69+
)
6470
.map(ExecutionContext.fromExecutorService)
65-
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
71+
blockingExecutor <- makeBlockingExecutor(DefaultBlockingExecutorConfig).map(ExecutionContext.fromExecutorService)
6672
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
6773
}
6874

6975
/** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with the provided callback executor and extra [[cats.effect.Blocker]]
7076
* executor for blocking operations.
7177
*/
72-
def makeFromExecutionContext[F[_]: Sync](executor: ExecutionContext): Resource[F, ExecutorModule[F]] = {
78+
def makeFromExecutionContext[F[_]: Sync](
79+
executor: ExecutionContext,
80+
blockingExecutorConfig: ThreadPoolExecutorConfig = DefaultBlockingExecutorConfig
81+
): Resource[F, ExecutorModule[F]] = {
7382
for {
7483
numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors))
75-
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
84+
blockingExecutor <- makeBlockingExecutor(blockingExecutorConfig).map(ExecutionContext.fromExecutorService)
7685
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
7786
}
7887

7988
/** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with executor and extra [[cats.effect.Blocker]] executor
8089
* for blocking operations.
8190
*/
82-
def makeFromConfig[F[_]: Sync](executorConfig: ThreadPoolExecutorConfig): Resource[F, ExecutorModule[F]] = {
91+
def makeFromConfig[F[_]: Sync](
92+
executorConfig: ThreadPoolExecutorConfig,
93+
blockingExecutorConfig: ThreadPoolExecutorConfig = DefaultBlockingExecutorConfig
94+
): Resource[F, ExecutorModule[F]] = {
8395
for {
8496
numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors))
8597
executor <- makeThreadPoolExecutor(executorConfig, toolkitThreadFactory, new LinkedBlockingQueue)
8698
.map(ExecutionContext.fromExecutorService)
87-
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
99+
blockingExecutor <- makeBlockingExecutor(blockingExecutorConfig).map(ExecutionContext.fromExecutorService)
88100
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
89101
}
90102

91103
/** Makes [[com.avast.sst.jvm.execution.ExecutorModule]] with fork-join executor and extra [[cats.effect.Blocker]] executor
92104
* for blocking operations.
93105
*/
94-
def makeForkJoinFromConfig[F[_]: Sync](executorConfig: ForkJoinPoolConfig): Resource[F, ExecutorModule[F]] = {
106+
def makeForkJoinFromConfig[F[_]: Sync](
107+
executorConfig: ForkJoinPoolConfig,
108+
blockingExecutorConfig: ThreadPoolExecutorConfig = DefaultBlockingExecutorConfig
109+
): Resource[F, ExecutorModule[F]] = {
95110
for {
96111
numOfCpus <- Resource.eval(Sync[F].delay(Runtime.getRuntime.availableProcessors))
97112
executor <- makeForkJoinPool(executorConfig, numOfCpus, toolkitThreadFactory)
98113
.map(ExecutionContext.fromExecutorService)
99-
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
114+
blockingExecutor <- makeBlockingExecutor(blockingExecutorConfig).map(ExecutionContext.fromExecutorService)
100115
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
101116
}
102117

103-
private def makeBlockingExecutor[F[_]: Sync] =
118+
private def makeBlockingExecutor[F[_]: Sync](config: ThreadPoolExecutorConfig) =
104119
makeThreadPoolExecutor[F](
105-
ThreadPoolExecutorConfig(0, Int.MaxValue),
120+
config,
106121
new ConfigurableThreadFactory(Config(nameFormat = Some("default-blocking-%02d"), daemon = true)),
107122
new SynchronousQueue
108123
)

jvm/src/main/scala/com/avast/sst/jvm/execution/ThreadPoolExecutorConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
66
final case class ThreadPoolExecutorConfig(
77
coreSize: Int,
88
maxSize: Int,
9-
keepAlive: FiniteDuration = Duration(1000L, TimeUnit.MILLISECONDS),
9+
keepAlive: FiniteDuration = Duration(60000L, TimeUnit.MILLISECONDS),
1010
allowCoreThreadTimeout: Boolean = false
1111
)

0 commit comments

Comments
 (0)