Skip to content

Commit e958b10

Browse files
committed
Merge remote-tracking branch 'upstream/master' into pimp-my-sbt
# Conflicts: # build.sbt # example/src/main/scala/com/avast/server/toolkit/example/Main.scala # project/Dependencies.scala
2 parents 3a90e5d + 5b50d3c commit e958b10

File tree

27 files changed

+706
-7
lines changed

27 files changed

+706
-7
lines changed

build.sbt

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ lazy val commonSettings = Seq(
1919
compilerPlugin(scalafixSemanticdb), // for scalafix
2020
Dependencies.silencerLib,
2121
Dependencies.catsEffect,
22-
Dependencies.scalaTest % Test
22+
Dependencies.Test.scalaTest % Test
2323
),
2424
Compile / compile / wartremoverErrors ++= Warts.all filterNot Set(
2525
Wart.Nothing, // keep, false positives all around
@@ -57,14 +57,14 @@ lazy val commonSettings = Seq(
5757

5858
lazy val root = project
5959
.in(file("."))
60-
.aggregate(example, pureconfig)
60+
.aggregate(example, jvmExecution, jvmSsl, jvmSystem, pureconfig)
6161
.settings(
6262
name := "scala-server-toolkit",
6363
publish / skip := true
6464
)
6565

6666
lazy val example = project
67-
.dependsOn(pureconfig)
67+
.dependsOn(jvmExecution, jvmSsl, jvmSystem, pureconfig)
6868
.enablePlugins(MdocPlugin)
6969
.settings(commonSettings)
7070
.settings(
@@ -80,6 +80,28 @@ lazy val example = project
8080
)
8181
)
8282

83+
lazy val jvmExecution = project
84+
.in(file("jvm-execution"))
85+
.settings(
86+
commonSettings,
87+
name := "scala-server-toolkit-jvm-execution",
88+
libraryDependencies += Dependencies.slf4jApi
89+
)
90+
91+
lazy val jvmSsl = project
92+
.in(file("jvm-ssl"))
93+
.settings(
94+
commonSettings,
95+
name := "scala-server-toolkit-jvm-ssl"
96+
)
97+
98+
lazy val jvmSystem = project
99+
.in(file("jvm-system"))
100+
.settings(
101+
commonSettings,
102+
name := "scala-server-toolkit-jvm-system"
103+
)
104+
83105
lazy val pureconfig = project
84106
.settings(commonSettings)
85107
.settings(

docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
* [Getting Started](#getting-started)
44
* [Rationale](rationale.md)
5+
* [Modules JVM](jvm.md)
56
* [Module PureConfig](pureconfig.md)
67

78
## Getting Started

docs/jvm.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Modules JVM
2+
3+
![Maven Central](https://img.shields.io/maven-central/v/com.avast/scala-server-toolkit-jvm-system_2.13)
4+
5+
`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"`
6+
7+
There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of different JVM-related utilities:
8+
9+
* `scala-server-toolkit-jvm-execution` - creation of thread pools,
10+
* `scala-server-toolkit-jvm-ssl` - initialization of SSL context,
11+
* `scala-server-toolkit-jvm-system` - standard in/out/err, random number generation.
12+
13+
```scala
14+
import com.avast.server.toolkit.system.console.ConsoleModule
15+
import com.avast.server.toolkit.system.random.RandomModule
16+
import zio.interop.catz._
17+
import zio.DefaultRuntime
18+
import zio.Task
19+
20+
val program = for {
21+
random <- RandomModule.makeRandom[Task]
22+
randomNumber <- random.nextInt
23+
console = ConsoleModule.make[Task]
24+
_ <- console.printLine(s"Random number: $randomNumber")
25+
} yield ()
26+
// program: zio.ZIO[Any, Throwable, Unit] = zio.ZIO$FlatMap@2f1f9515
27+
28+
val runtime = new DefaultRuntime {} // this is just in example
29+
// runtime: AnyRef with DefaultRuntime = repl.Session$App$$anon$1@33ebe4f0 // this is just in example
30+
runtime.unsafeRun(program)
31+
// Random number: 776310297
32+
```
33+

docs/pureconfig.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import zio.Task
2020
final case class ServerConfiguration(listenAddress: String, listenPort: Int)
2121

2222
implicit val serverConfigurationReader: ConfigReader[ServerConfiguration] = deriveReader
23-
// serverConfigurationReader: ConfigReader[ServerConfiguration] = pureconfig.generic.DerivedConfigReader1$$anon$3@3c04ddda
23+
// serverConfigurationReader: ConfigReader[ServerConfiguration] = pureconfig.generic.DerivedConfigReader1$$anon$3@2a8eed58
2424

2525
val maybeConfiguration = PureConfigModule.make[Task, ServerConfiguration]
26-
// maybeConfiguration: Task[Either[cats.data.NonEmptyList[String], ServerConfiguration]] = zio.ZIO$EffectPartial@47b494ce
26+
// maybeConfiguration: Task[Either[cats.data.NonEmptyList[String], ServerConfiguration]] = zio.ZIO$EffectPartial@352bea0e
2727
```
2828

example/src/main/mdoc/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
* [Getting Started](#getting-started)
44
* [Rationale](rationale.md)
5+
* [Modules JVM](jvm.md)
56
* [Module PureConfig](pureconfig.md)
67

78
## Getting Started

example/src/main/mdoc/jvm.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Modules JVM
2+
3+
![Maven Central](https://img.shields.io/maven-central/v/com.avast/scala-server-toolkit-jvm-system_2.13)
4+
5+
`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"`
6+
7+
There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of different JVM-related utilities:
8+
9+
* `scala-server-toolkit-jvm-execution` - creation of thread pools,
10+
* `scala-server-toolkit-jvm-ssl` - initialization of SSL context,
11+
* `scala-server-toolkit-jvm-system` - standard in/out/err, random number generation.
12+
13+
```scala mdoc
14+
import com.avast.server.toolkit.system.console.ConsoleModule
15+
import com.avast.server.toolkit.system.random.RandomModule
16+
import zio.interop.catz._
17+
import zio.DefaultRuntime
18+
import zio.Task
19+
20+
val program = for {
21+
random <- RandomModule.makeRandom[Task]
22+
randomNumber <- random.nextInt
23+
console = ConsoleModule.make[Task]
24+
_ <- console.printLine(s"Random number: $randomNumber")
25+
} yield ()
26+
27+
val runtime = new DefaultRuntime {} // this is just in example
28+
runtime.unsafeRun(program)
29+
```

example/src/main/scala/com/avast/server/toolkit/example/Main.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package com.avast.server.toolkit.example
22

3-
import cats.effect.Resource
3+
import java.util.concurrent.TimeUnit
4+
5+
import cats.effect.{Clock, Resource}
46
import com.avast.server.toolkit.example.config.Configuration
7+
import com.avast.server.toolkit.execution.ExecutorModule
58
import com.avast.server.toolkit.pureconfig.PureConfigModule
9+
import com.avast.server.toolkit.system.console.{Console, ConsoleModule}
610
import com.github.ghik.silencer.silent
711
import zio.interop.catz._
812
import zio.{Task, ZIO}
@@ -12,6 +16,13 @@ object Main extends CatsApp {
1216
def program: Resource[Task, Unit] = {
1317
for {
1418
_ <- Resource.liftF(PureConfigModule.makeOrRaise[Task, Configuration])
19+
executorModule <- ExecutorModule.makeFromExecutionContext[Task](runtime.Platform.executor.asEC)
20+
clock = Clock.create[Task]
21+
currentTime <- Resource.liftF(clock.realTime(TimeUnit.MILLISECONDS))
22+
console <- Resource.pure[Task, Console[Task]](ConsoleModule.make[Task])
23+
_ <- Resource.liftF(
24+
console.printLine(s"The current Unix epoch time is $currentTime. This system has ${executorModule.numOfCpus} CPUs.")
25+
)
1526
} yield ()
1627
}
1728

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.avast.server.toolkit.execution
2+
3+
import java.lang.Thread.UncaughtExceptionHandler
4+
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
5+
import java.util.concurrent.atomic.AtomicLong
6+
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread, ThreadFactory}
7+
8+
import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config
9+
10+
/** Thread factory (both [[java.util.concurrent.ThreadFactory]] and [[java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory]])
11+
* that creates new threads according to the provided [[com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config]].
12+
*/
13+
class ConfigurableThreadFactory(config: Config) extends ThreadFactory with ForkJoinWorkerThreadFactory {
14+
15+
private val counter = new AtomicLong(0L)
16+
17+
override def newThread(r: Runnable): Thread = configure(new Thread(r))
18+
19+
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
20+
configure(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool))
21+
}
22+
23+
private def configure[A <: Thread](thread: A) = {
24+
config.nameFormat.map(_.format(counter.getAndIncrement())).foreach(thread.setName)
25+
thread.setDaemon(config.daemon)
26+
thread.setPriority(config.priority)
27+
thread.setUncaughtExceptionHandler(config.uncaughtExceptionHandler)
28+
thread
29+
}
30+
31+
}
32+
33+
object ConfigurableThreadFactory {
34+
35+
/**
36+
* @param nameFormat Formatted with long number, e.g. my-thread-%02d
37+
*/
38+
final case class Config(nameFormat: Option[String] = None,
39+
daemon: Boolean = false,
40+
priority: Int = Thread.NORM_PRIORITY,
41+
uncaughtExceptionHandler: UncaughtExceptionHandler = LoggingUncaughtExceptionHandler)
42+
43+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package com.avast.server.toolkit.execution
2+
3+
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
4+
import java.util.concurrent.{
5+
BlockingQueue,
6+
ExecutorService,
7+
ForkJoinPool,
8+
LinkedBlockingQueue,
9+
SynchronousQueue,
10+
ThreadFactory,
11+
ThreadPoolExecutor,
12+
TimeUnit
13+
}
14+
15+
import cats.effect.{Blocker, Resource, Sync}
16+
import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config
17+
18+
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
19+
import scala.language.higherKinds
20+
21+
/** Provides necessary executors - the default one for execution of your business logic and callbacks and special one designated for
22+
* blocking operations. Also allows you to create more executors if you need them.
23+
*/
24+
class ExecutorModule[F[_]: Sync](val numOfCpus: Int,
25+
val executionContext: ExecutionContext,
26+
blockingExecutor: ExecutionContextExecutorService) {
27+
module =>
28+
29+
/** Executor designated for blocking operations. */
30+
val blocker: Blocker = Blocker.liftExecutionContext(blockingExecutor)
31+
32+
/** [[java.util.concurrent.ExecutorService]] that can be used for blocking operations in Java-interop code. */
33+
val blockingExecutorService: ExecutorService = blockingExecutor
34+
35+
/** Provides implicit reference to the default callback executor for easier use. */
36+
object Implicits {
37+
38+
implicit val executionContext: ExecutionContext = module.executionContext
39+
40+
}
41+
42+
/** Makes [[java.util.concurrent.ThreadPoolExecutor]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */
43+
def makeThreadPoolExecutor(config: ThreadPoolExecutorConfig, threadFactory: ThreadFactory): Resource[F, ThreadPoolExecutor] = {
44+
ExecutorModule.makeThreadPoolExecutor(config, threadFactory, new LinkedBlockingQueue)
45+
}
46+
47+
/** Makes [[java.util.concurrent.ForkJoinPool]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */
48+
def makeForkJoinPool(config: ForkJoinPoolConfig, threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ForkJoinPool] = {
49+
ExecutorModule.makeForkJoinPool(config, numOfCpus, threadFactory)
50+
}
51+
52+
}
53+
54+
object ExecutorModule {
55+
56+
/** Makes [[com.avast.server.toolkit.execution.ExecutorModule]] with default callback executor and extra [[cats.effect.Blocker]] executor
57+
* for blocking operations.
58+
*/
59+
def makeDefault[F[_]: Sync]: Resource[F, ExecutorModule[F]] = {
60+
for {
61+
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
62+
coreSize = numOfCpus * 2
63+
executor <- makeThreadPoolExecutor(ThreadPoolExecutorConfig(coreSize, coreSize), toolkitThreadFactory, new LinkedBlockingQueue)
64+
.map(ExecutionContext.fromExecutorService)
65+
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
66+
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
67+
}
68+
69+
/** Makes [[com.avast.server.toolkit.execution.ExecutorModule]] with the provided callback executor and extra [[cats.effect.Blocker]]
70+
* executor for blocking operations.
71+
*/
72+
def makeFromExecutionContext[F[_]: Sync](executor: ExecutionContext): Resource[F, ExecutorModule[F]] = {
73+
for {
74+
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
75+
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
76+
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
77+
}
78+
79+
/** Makes [[com.avast.server.toolkit.execution.ExecutorModule]] with executor and extra [[cats.effect.Blocker]] executor
80+
* for blocking operations.
81+
*/
82+
def makeFromConfig[F[_]: Sync](executorConfig: ThreadPoolExecutorConfig): Resource[F, ExecutorModule[F]] = {
83+
for {
84+
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
85+
executor <- makeThreadPoolExecutor(executorConfig, toolkitThreadFactory, new LinkedBlockingQueue)
86+
.map(ExecutionContext.fromExecutorService)
87+
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
88+
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
89+
}
90+
91+
/** Makes [[com.avast.server.toolkit.execution.ExecutorModule]] with fork-join executor and extra [[cats.effect.Blocker]] executor
92+
* for blocking operations.
93+
*/
94+
def makeForkJoinFromConfig[F[_]: Sync](executorConfig: ForkJoinPoolConfig): Resource[F, ExecutorModule[F]] = {
95+
for {
96+
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
97+
executor <- makeForkJoinPool(executorConfig, numOfCpus, toolkitThreadFactory)
98+
.map(ExecutionContext.fromExecutorService)
99+
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
100+
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
101+
}
102+
103+
private def makeBlockingExecutor[F[_]: Sync] =
104+
makeThreadPoolExecutor[F](
105+
ThreadPoolExecutorConfig(0, Int.MaxValue),
106+
new ConfigurableThreadFactory(Config(nameFormat = Some("default-blocking-%02d"), daemon = true)),
107+
new SynchronousQueue
108+
)
109+
110+
private def toolkitThreadFactory = new ConfigurableThreadFactory(Config(nameFormat = Some("default-async-%02d"), daemon = true))
111+
112+
private def makeThreadPoolExecutor[F[_]: Sync](config: ThreadPoolExecutorConfig,
113+
threadFactory: ThreadFactory,
114+
queue: BlockingQueue[Runnable]): Resource[F, ThreadPoolExecutor] = {
115+
Resource.make {
116+
Sync[F].delay {
117+
val threadPool = new ThreadPoolExecutor(
118+
config.coreSize,
119+
config.maxSize,
120+
config.keepAlive.toMillis,
121+
TimeUnit.MILLISECONDS,
122+
queue,
123+
threadFactory
124+
)
125+
threadPool.allowCoreThreadTimeOut(true)
126+
127+
threadPool
128+
}
129+
}(pool => Sync[F].delay(pool.shutdown()))
130+
}
131+
132+
private def makeForkJoinPool[F[_]: Sync](config: ForkJoinPoolConfig,
133+
numOfCpus: Int,
134+
threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ForkJoinPool] = {
135+
Resource.make {
136+
Sync[F].delay {
137+
new ForkJoinPool(config.computeParallelism(numOfCpus), threadFactory, LoggingUncaughtExceptionHandler, config.computeAsyncMode)
138+
}
139+
}(pool => Sync[F].delay(pool.shutdown()))
140+
}
141+
142+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.avast.server.toolkit.execution
2+
3+
import ForkJoinPoolConfig.TaskPeekingMode
4+
import ForkJoinPoolConfig.TaskPeekingMode.{FIFO, LIFO}
5+
6+
final case class ForkJoinPoolConfig(parallelismMin: Int = 8,
7+
parallelismFactor: Double = 1.0,
8+
parallelismMax: Int = 64,
9+
taskPeekingMode: TaskPeekingMode = FIFO) {
10+
11+
private[toolkit] def computeParallelism(numOfCpus: Int): Int = {
12+
math.min(math.max(math.ceil(numOfCpus * parallelismFactor).toInt, parallelismMin), parallelismMax)
13+
}
14+
15+
private[toolkit] def computeAsyncMode: Boolean = taskPeekingMode match {
16+
case FIFO => true
17+
case LIFO => false
18+
}
19+
20+
}
21+
22+
object ForkJoinPoolConfig {
23+
24+
val Default: ForkJoinPoolConfig = ForkJoinPoolConfig()
25+
26+
sealed trait TaskPeekingMode
27+
28+
object TaskPeekingMode {
29+
30+
case object FIFO extends TaskPeekingMode
31+
32+
case object LIFO extends TaskPeekingMode
33+
34+
}
35+
36+
}

0 commit comments

Comments
 (0)