Skip to content

Commit b094ed6

Browse files
committed
feat: Add jvm-execution, jvm-ssl and jvm-system modules
Implementation of JVM-related utilities for std in/out, clock, random number generation, SSL context initialization and thread pools. docs: Add section about jvm-* modules refactor: Add use of jvm-* modules to example build: Put test dependencies to separate object
1 parent 269fa98 commit b094ed6

File tree

25 files changed

+696
-6
lines changed

25 files changed

+696
-6
lines changed

build.sbt

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@ lazy val commonSettings = Seq(
1616
libraryDependencies ++= Seq(
1717
compilerPlugin(Dependencies.kindProjector),
1818
Dependencies.catsEffect,
19-
Dependencies.scalaTest
19+
Dependencies.Test.scalaTest
2020
),
2121
Test / publishArtifact := false
2222
)
2323

2424
lazy val root = (project in file("."))
25-
.aggregate(example, pureconfig)
25+
.aggregate(example, jvmExecution, jvmSsl, jvmSystem, pureconfig)
2626
.settings(
2727
name := "scala-server-toolkit",
2828
publish / skip := true
2929
)
3030

3131
lazy val example = project
32-
.dependsOn(pureconfig)
32+
.dependsOn(jvmExecution, jvmSsl, jvmSystem, pureconfig)
3333
.enablePlugins(MdocPlugin)
3434
.settings(
3535
commonSettings,
@@ -45,6 +45,25 @@ lazy val example = project
4545
)
4646
)
4747

48+
lazy val jvmExecution = (project in file("jvm-execution"))
49+
.settings(
50+
commonSettings,
51+
name := "scala-server-toolkit-jvm-execution",
52+
libraryDependencies += Dependencies.scalaLogging
53+
)
54+
55+
lazy val jvmSsl = (project in file("jvm-ssl"))
56+
.settings(
57+
commonSettings,
58+
name := "scala-server-toolkit-jvm-ssl"
59+
)
60+
61+
lazy val jvmSystem = (project in file("jvm-system"))
62+
.settings(
63+
commonSettings,
64+
name := "scala-server-toolkit-jvm-system"
65+
)
66+
4867
lazy val pureconfig = project
4968
.settings(
5069
commonSettings,

docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Scala Server Toolkit Documentation
22

33
* [Getting Started](#getting-started)
4+
* [Modules JVM](jvm.md)
45
* [Module PureConfig](pureconfig.md)
56

67
## Getting Started

docs/jvm.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Modules JVM
2+
3+
![Maven Central](https://img.shields.io/maven-central/v/com.avast/scala-server-toolkit-jvm-system_2.13)
4+
5+
`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"`
6+
7+
There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of JVM-related utilities such as standard in/out,
8+
time, random number generation, thread pools and SSL context initialization. The following modules are available:
9+
10+
* `scala-server-toolkit-jvm-execution`,
11+
* `scala-server-toolkit-jvm-ssl`,
12+
* `scala-server-toolkit-jvm-system`.
13+
14+
```scala
15+
import java.util.concurrent.TimeUnit
16+
import com.avast.server.toolkit.system.SystemModule
17+
import zio.interop.catz._
18+
import zio.Task
19+
20+
val program = for {
21+
systemModule <- SystemModule.make[Task]
22+
currentTime <- systemModule.clock.realTime(TimeUnit.MILLISECONDS)
23+
randomNumber <- systemModule.random.nextInt
24+
_ <- systemModule.console.printLine(s"Current Unix epoch time is $currentTime ms. Random number: $randomNumber")
25+
} yield ()
26+
// program: zio.ZIO[Any, Throwable, Unit] = zio.ZIO$FlatMap@2d237460
27+
```
28+

docs/pureconfig.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import zio.Task
2020
final case class ServerConfiguration(listenAddress: String, listenPort: Int)
2121

2222
implicit val serverConfigurationReader: ConfigReader[ServerConfiguration] = deriveReader
23-
// serverConfigurationReader: ConfigReader[ServerConfiguration] = pureconfig.generic.DerivedConfigReader1$$anon$3@7d4e424e
23+
// serverConfigurationReader: ConfigReader[ServerConfiguration] = pureconfig.generic.DerivedConfigReader1$$anon$3@3e7940b3
2424

2525
val maybeConfiguration = PureConfigModule.make[Task, ServerConfiguration]
26-
// maybeConfiguration: Task[Either[cats.data.NonEmptyList[String], ServerConfiguration]] = zio.ZIO$EffectPartial@e2498a3
26+
// maybeConfiguration: Task[Either[cats.data.NonEmptyList[String], ServerConfiguration]] = zio.ZIO$EffectPartial@1ae37be2
2727
```
2828

example/src/main/mdoc/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Scala Server Toolkit Documentation
22

33
* [Getting Started](#getting-started)
4+
* [Modules JVM](jvm.md)
45
* [Module PureConfig](pureconfig.md)
56

67
## Getting Started

example/src/main/mdoc/jvm.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Modules JVM
2+
3+
![Maven Central](https://img.shields.io/maven-central/v/com.avast/scala-server-toolkit-jvm-system_2.13)
4+
5+
`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"`
6+
7+
There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of JVM-related utilities such as standard in/out,
8+
time, random number generation, thread pools and SSL context initialization. The following modules are available:
9+
10+
* `scala-server-toolkit-jvm-execution`,
11+
* `scala-server-toolkit-jvm-ssl`,
12+
* `scala-server-toolkit-jvm-system`.
13+
14+
```scala mdoc
15+
import java.util.concurrent.TimeUnit
16+
import com.avast.server.toolkit.system.SystemModule
17+
import zio.interop.catz._
18+
import zio.Task
19+
20+
val program = for {
21+
systemModule <- SystemModule.make[Task]
22+
currentTime <- systemModule.clock.realTime(TimeUnit.MILLISECONDS)
23+
randomNumber <- systemModule.random.nextInt
24+
_ <- systemModule.console.printLine(s"Current Unix epoch time is $currentTime ms. Random number: $randomNumber")
25+
} yield ()
26+
```

example/src/main/scala/com/avast/server/toolkit/example/Main.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package com.avast.server.toolkit.example
22

3+
import java.util.concurrent.TimeUnit
4+
35
import cats.effect.Resource
46
import com.avast.server.toolkit.example.config.Configuration
7+
import com.avast.server.toolkit.execution.ExecutorModule
58
import com.avast.server.toolkit.pureconfig.PureConfigModule
9+
import com.avast.server.toolkit.system.SystemModule
610
import zio.interop.catz._
711
import zio.{Task, ZIO}
812

@@ -11,6 +15,14 @@ object Main extends CatsApp {
1115
def program: Resource[Task, Unit] = {
1216
for {
1317
configuration <- Resource.liftF(PureConfigModule.makeOrRaise[Task, Configuration])
18+
systemModule <- Resource.liftF(SystemModule.make[Task])
19+
executorModule <- ExecutorModule.make[Task](runtime.Platform.executor.asEC)
20+
currentTime <- Resource.liftF(systemModule.clock.realTime(TimeUnit.MILLISECONDS))
21+
_ <- Resource.liftF(
22+
systemModule
23+
.console
24+
.printLine(s"The current Unix epoch time is $currentTime. This system has ${executorModule.numOfCpus} CPUs.")
25+
)
1426
} yield ()
1527
}
1628

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.avast.server.toolkit.execution
2+
3+
import java.lang.Thread.UncaughtExceptionHandler
4+
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
5+
import java.util.concurrent.atomic.AtomicLong
6+
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread, ThreadFactory}
7+
8+
import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config
9+
10+
/** Thread factory (both [[java.util.concurrent.ThreadFactory]] and [[java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory]])
11+
* that creates new threads according to the provided [[com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config]].
12+
*/
13+
class ConfigurableThreadFactory(config: Config) extends ThreadFactory with ForkJoinWorkerThreadFactory {
14+
15+
private val counter = new AtomicLong(0L)
16+
17+
override def newThread(r: Runnable): Thread = configure(new Thread(r))
18+
19+
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
20+
configure(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool))
21+
}
22+
23+
private def configure[A <: Thread](thread: A) = {
24+
config.nameFormat.map(_.format(counter.getAndIncrement())).foreach(thread.setName)
25+
thread.setDaemon(config.daemon)
26+
thread.setPriority(config.priority)
27+
thread.setUncaughtExceptionHandler(config.uncaughtExceptionHandler)
28+
thread
29+
}
30+
31+
}
32+
33+
object ConfigurableThreadFactory {
34+
35+
/**
36+
* @param nameFormat Formatted with long number, e.g. my-thread-%02d
37+
*/
38+
final case class Config(nameFormat: Option[String] = None,
39+
daemon: Boolean = false,
40+
priority: Int = Thread.NORM_PRIORITY,
41+
uncaughtExceptionHandler: UncaughtExceptionHandler = LoggingUncaughtExceptionHandler)
42+
43+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package com.avast.server.toolkit.execution
2+
3+
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
4+
import java.util.concurrent.{
5+
BlockingQueue,
6+
ExecutorService,
7+
ForkJoinPool,
8+
LinkedBlockingQueue,
9+
SynchronousQueue,
10+
ThreadFactory,
11+
ThreadPoolExecutor,
12+
TimeUnit
13+
}
14+
15+
import cats.effect.{Blocker, Resource, Sync}
16+
import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config
17+
18+
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
19+
import scala.language.higherKinds
20+
21+
/** Provides necessary executors - the default one for execution of your business logic and callbacks and special one designated for
22+
* blocking operations. Also allows you to create more executors if you need them.
23+
*/
24+
class ExecutorModule[F[_]: Sync](val numOfCpus: Int, val executor: ExecutionContext, blockingExecutor: ExecutionContextExecutorService) {
25+
module =>
26+
27+
/** Executor designated for blocking operations. */
28+
val blocker: Blocker = Blocker.liftExecutionContext(blockingExecutor)
29+
30+
/** [[java.util.concurrent.ExecutorService]] that can be used for blocking operations in Java-interop code. */
31+
lazy val blockingExecutorService: ExecutorService = blockingExecutor
32+
33+
/** Provides implicit reference to the default callback executor for easier use. */
34+
object Implicits {
35+
36+
implicit val executor: ExecutionContext = module.executor
37+
38+
}
39+
40+
/** Makes [[java.util.concurrent.ThreadPoolExecutor]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */
41+
def makeThreadPoolExecutor(config: ThreadPoolExecutorConfig,
42+
threadFactory: ThreadFactory): Resource[F, ExecutionContextExecutorService] = {
43+
ExecutorModule.makeThreadPoolExecutor(config, threadFactory, new LinkedBlockingQueue).map(ExecutionContext.fromExecutorService)
44+
}
45+
46+
/** Makes [[java.util.concurrent.ForkJoinPool]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */
47+
def makeForkJoinExecutor(config: ForkJoinPoolConfig,
48+
threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ExecutionContextExecutorService] = {
49+
ExecutorModule.makeForkJoinPool(config, numOfCpus, threadFactory).map(ExecutionContext.fromExecutorService)
50+
}
51+
52+
}
53+
54+
object ExecutorModule {
55+
56+
/** Makes [[ExecutorModule]] with default callback executor and extra [[cats.effect.Blocker]] executor for blocking operations. */
57+
def make[F[_]: Sync]: Resource[F, ExecutorModule[F]] = {
58+
for {
59+
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
60+
coreSize = numOfCpus * 2
61+
executor <- makeThreadPoolExecutor(ThreadPoolExecutorConfig(coreSize, coreSize), toolkitThreadFactory, new LinkedBlockingQueue)
62+
.map(ExecutionContext.fromExecutorService)
63+
be <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
64+
} yield new ExecutorModule[F](numOfCpus, executor, be)
65+
}
66+
67+
/** Makes [[ExecutorModule]] with the provided callback executor and extra [[cats.effect.Blocker]] executor for blocking operations. */
68+
def make[F[_]: Sync](executor: ExecutionContext): Resource[F, ExecutorModule[F]] = {
69+
for {
70+
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
71+
be <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
72+
} yield new ExecutorModule[F](numOfCpus, executor, be)
73+
}
74+
75+
/** Makes [[ExecutorModule]] with executor and extra [[cats.effect.Blocker]] executor for blocking operations. */
76+
def make[F[_]: Sync](executorConfig: ThreadPoolExecutorConfig): Resource[F, ExecutorModule[F]] = {
77+
for {
78+
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
79+
executor <- makeThreadPoolExecutor(executorConfig, toolkitThreadFactory, new LinkedBlockingQueue)
80+
.map(ExecutionContext.fromExecutorService)
81+
be <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
82+
} yield new ExecutorModule[F](numOfCpus, executor, be)
83+
}
84+
85+
/** Makes [[ExecutorModule]] with fork-join executor and extra [[cats.effect.Blocker]] executor for blocking operations. */
86+
def makeWithForkJoin[F[_]: Sync](executorConfig: ForkJoinPoolConfig): Resource[F, ExecutorModule[F]] = {
87+
for {
88+
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
89+
executor <- makeForkJoinPool(executorConfig, numOfCpus, toolkitThreadFactory)
90+
.map(ExecutionContext.fromExecutorService)
91+
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
92+
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
93+
}
94+
95+
private def makeBlockingExecutor[F[_]: Sync] =
96+
makeThreadPoolExecutor[F](
97+
ThreadPoolExecutorConfig(0, Int.MaxValue),
98+
new ConfigurableThreadFactory(Config(nameFormat = Some("default-blocking-%02d"), daemon = true)),
99+
new SynchronousQueue
100+
)
101+
102+
private def toolkitThreadFactory = new ConfigurableThreadFactory(Config(nameFormat = Some("default-async-%02d"), daemon = true))
103+
104+
private def makeThreadPoolExecutor[F[_]: Sync](config: ThreadPoolExecutorConfig,
105+
threadFactory: ThreadFactory,
106+
queue: BlockingQueue[Runnable]): Resource[F, ThreadPoolExecutor] = {
107+
Resource.make {
108+
Sync[F].delay {
109+
val threadPool = new ThreadPoolExecutor(
110+
config.coreSize,
111+
config.maxSize,
112+
config.keepAlive.toMillis,
113+
TimeUnit.MILLISECONDS,
114+
queue,
115+
threadFactory
116+
)
117+
threadPool.allowCoreThreadTimeOut(true)
118+
119+
threadPool
120+
}
121+
}(pool => Sync[F].delay(pool.shutdown()))
122+
}
123+
124+
private def makeForkJoinPool[F[_]: Sync](config: ForkJoinPoolConfig,
125+
numOfCpus: Int,
126+
threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ForkJoinPool] = {
127+
Resource.make {
128+
Sync[F].delay {
129+
new ForkJoinPool(config.computeParallelism(numOfCpus), threadFactory, LoggingUncaughtExceptionHandler, config.computeAsyncMode)
130+
}
131+
}(pool => Sync[F].delay(pool.shutdown()))
132+
}
133+
134+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.avast.server.toolkit.execution
2+
3+
import ForkJoinPoolConfig.TaskPeekingMode
4+
import ForkJoinPoolConfig.TaskPeekingMode.{FIFO, LIFO}
5+
6+
final case class ForkJoinPoolConfig(parallelismMin: Int = 8,
7+
parallelismFactor: Double = 1.0,
8+
parallelismMax: Int = 64,
9+
taskPeekingMode: TaskPeekingMode = FIFO) {
10+
11+
private[toolkit] def computeParallelism(numOfCpus: Int): Int = {
12+
math.min(math.max(math.ceil(numOfCpus * parallelismFactor).toInt, parallelismMin), parallelismMax)
13+
}
14+
15+
private[toolkit] def computeAsyncMode: Boolean = taskPeekingMode match {
16+
case FIFO => true
17+
case LIFO => false
18+
}
19+
20+
}
21+
22+
object ForkJoinPoolConfig {
23+
24+
val Default: ForkJoinPoolConfig = ForkJoinPoolConfig()
25+
26+
sealed trait TaskPeekingMode
27+
28+
object TaskPeekingMode {
29+
30+
case object FIFO extends TaskPeekingMode
31+
32+
case object LIFO extends TaskPeekingMode
33+
34+
}
35+
36+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.avast.server.toolkit.execution
2+
3+
import java.lang.Thread.UncaughtExceptionHandler
4+
5+
import com.typesafe.scalalogging.LazyLogging
6+
7+
object LoggingUncaughtExceptionHandler extends UncaughtExceptionHandler with LazyLogging {
8+
9+
override def uncaughtException(t: Thread, ex: Throwable): Unit = logger.error(s"Uncaught exception on thread ${t.getName}", ex)
10+
11+
}

0 commit comments

Comments
 (0)