Skip to content

feat: Add jvm-execution, jvm-ssl and jvm-system modules #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ lazy val commonSettings = Seq(
libraryDependencies ++= Seq(
compilerPlugin(Dependencies.kindProjector),
Dependencies.catsEffect,
Dependencies.scalaTest
Dependencies.Test.scalaTest
),
Test / publishArtifact := false
)

lazy val root = (project in file("."))
.aggregate(example, pureconfig)
lazy val root = project
.in(file("."))
.aggregate(example, jvmExecution, jvmSsl, jvmSystem, pureconfig)
.settings(
name := "scala-server-toolkit",
publish / skip := true
)

lazy val example = project
.dependsOn(pureconfig)
.dependsOn(jvmExecution, jvmSsl, jvmSystem, pureconfig)
.enablePlugins(MdocPlugin)
.settings(
commonSettings,
Expand All @@ -45,6 +46,28 @@ lazy val example = project
)
)

lazy val jvmExecution = project
.in(file("jvm-execution"))
.settings(
commonSettings,
name := "scala-server-toolkit-jvm-execution",
libraryDependencies += Dependencies.slf4jApi
)

lazy val jvmSsl = project
.in(file("jvm-ssl"))
.settings(
commonSettings,
name := "scala-server-toolkit-jvm-ssl"
)

lazy val jvmSystem = project
.in(file("jvm-system"))
.settings(
commonSettings,
name := "scala-server-toolkit-jvm-system"
)

lazy val pureconfig = project
.settings(
commonSettings,
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [Getting Started](#getting-started)
* [Rationale](rationale.md)
* [Modules JVM](jvm.md)
* [Module PureConfig](pureconfig.md)

## Getting Started
Expand Down
33 changes: 33 additions & 0 deletions docs/jvm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Modules JVM

![Maven Central](https://img.shields.io/maven-central/v/com.avast/scala-server-toolkit-jvm-system_2.13)

`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"`

There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of different JVM-related utilities:

* `scala-server-toolkit-jvm-execution` - creation of thread pools,
* `scala-server-toolkit-jvm-ssl` - initialization of SSL context,
* `scala-server-toolkit-jvm-system` - standard in/out/err, random number generation.

```scala
import com.avast.server.toolkit.system.console.ConsoleModule
import com.avast.server.toolkit.system.random.RandomModule
import zio.interop.catz._
import zio.DefaultRuntime
import zio.Task

val program = for {
random <- RandomModule.makeRandom[Task]
randomNumber <- random.nextInt
console = ConsoleModule.make[Task]
_ <- console.printLine(s"Random number: $randomNumber")
} yield ()
// program: zio.ZIO[Any, Throwable, Unit] = zio.ZIO$FlatMap@2f1f9515

val runtime = new DefaultRuntime {} // this is just in example
// runtime: AnyRef with DefaultRuntime = repl.Session$App$$anon$1@33ebe4f0 // this is just in example
runtime.unsafeRun(program)
// Random number: 776310297
```

4 changes: 2 additions & 2 deletions docs/pureconfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import zio.Task
final case class ServerConfiguration(listenAddress: String, listenPort: Int)

implicit val serverConfigurationReader: ConfigReader[ServerConfiguration] = deriveReader
// serverConfigurationReader: ConfigReader[ServerConfiguration] = pureconfig.generic.DerivedConfigReader1$$anon$3@3c04ddda
// serverConfigurationReader: ConfigReader[ServerConfiguration] = pureconfig.generic.DerivedConfigReader1$$anon$3@2a8eed58

val maybeConfiguration = PureConfigModule.make[Task, ServerConfiguration]
// maybeConfiguration: Task[Either[cats.data.NonEmptyList[String], ServerConfiguration]] = zio.ZIO$EffectPartial@47b494ce
// maybeConfiguration: Task[Either[cats.data.NonEmptyList[String], ServerConfiguration]] = zio.ZIO$EffectPartial@352bea0e
```

1 change: 1 addition & 0 deletions example/src/main/mdoc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [Getting Started](#getting-started)
* [Rationale](rationale.md)
* [Modules JVM](jvm.md)
* [Module PureConfig](pureconfig.md)

## Getting Started
Expand Down
29 changes: 29 additions & 0 deletions example/src/main/mdoc/jvm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Modules JVM

![Maven Central](https://img.shields.io/maven-central/v/com.avast/scala-server-toolkit-jvm-system_2.13)

`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"`

There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of different JVM-related utilities:

* `scala-server-toolkit-jvm-execution` - creation of thread pools,
* `scala-server-toolkit-jvm-ssl` - initialization of SSL context,
* `scala-server-toolkit-jvm-system` - standard in/out/err, random number generation.

```scala mdoc
import com.avast.server.toolkit.system.console.ConsoleModule
import com.avast.server.toolkit.system.random.RandomModule
import zio.interop.catz._
import zio.DefaultRuntime
import zio.Task

val program = for {
random <- RandomModule.makeRandom[Task]
randomNumber <- random.nextInt
console = ConsoleModule.make[Task]
_ <- console.printLine(s"Random number: $randomNumber")
} yield ()

val runtime = new DefaultRuntime {} // this is just in example
runtime.unsafeRun(program)
```
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package com.avast.server.toolkit.example

import cats.effect.Resource
import java.util.concurrent.TimeUnit

import cats.effect.{Clock, Resource}
import com.avast.server.toolkit.example.config.Configuration
import com.avast.server.toolkit.execution.ExecutorModule
import com.avast.server.toolkit.pureconfig.PureConfigModule
import com.avast.server.toolkit.system.console.{Console, ConsoleModule}
import zio.interop.catz._
import zio.{Task, ZIO}

Expand All @@ -11,6 +15,13 @@ object Main extends CatsApp {
def program: Resource[Task, Unit] = {
for {
configuration <- Resource.liftF(PureConfigModule.makeOrRaise[Task, Configuration])
executorModule <- ExecutorModule.makeFromExecutionContext[Task](runtime.Platform.executor.asEC)
clock = Clock.create[Task]
currentTime <- Resource.liftF(clock.realTime(TimeUnit.MILLISECONDS))
console <- Resource.pure[Task, Console[Task]](ConsoleModule.make[Task])
_ <- Resource.liftF(
console.printLine(s"The current Unix epoch time is $currentTime. This system has ${executorModule.numOfCpus} CPUs.")
)
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.avast.server.toolkit.execution

import java.lang.Thread.UncaughtExceptionHandler
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread, ThreadFactory}

import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config

/** Thread factory (both [[java.util.concurrent.ThreadFactory]] and [[java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory]])
* that creates new threads according to the provided [[com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config]].
*/
class ConfigurableThreadFactory(config: Config) extends ThreadFactory with ForkJoinWorkerThreadFactory {

private val counter = new AtomicLong(0L)

override def newThread(r: Runnable): Thread = configure(new Thread(r))

override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
configure(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool))
}

private def configure[A <: Thread](thread: A) = {
config.nameFormat.map(_.format(counter.getAndIncrement())).foreach(thread.setName)
thread.setDaemon(config.daemon)
thread.setPriority(config.priority)
thread.setUncaughtExceptionHandler(config.uncaughtExceptionHandler)
thread
}

}

object ConfigurableThreadFactory {

/**
* @param nameFormat Formatted with long number, e.g. my-thread-%02d
*/
final case class Config(nameFormat: Option[String] = None,
daemon: Boolean = false,
priority: Int = Thread.NORM_PRIORITY,
uncaughtExceptionHandler: UncaughtExceptionHandler = LoggingUncaughtExceptionHandler)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.avast.server.toolkit.execution

import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
import java.util.concurrent.{
BlockingQueue,
ExecutorService,
ForkJoinPool,
LinkedBlockingQueue,
SynchronousQueue,
ThreadFactory,
ThreadPoolExecutor,
TimeUnit
}

import cats.effect.{Blocker, Resource, Sync}
import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config

import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.language.higherKinds

/** Provides necessary executors - the default one for execution of your business logic and callbacks and special one designated for
* blocking operations. Also allows you to create more executors if you need them.
*/
class ExecutorModule[F[_]: Sync](val numOfCpus: Int,
val executionContext: ExecutionContext,
blockingExecutor: ExecutionContextExecutorService) {
module =>

/** Executor designated for blocking operations. */
val blocker: Blocker = Blocker.liftExecutionContext(blockingExecutor)

/** [[java.util.concurrent.ExecutorService]] that can be used for blocking operations in Java-interop code. */
val blockingExecutorService: ExecutorService = blockingExecutor

/** Provides implicit reference to the default callback executor for easier use. */
object Implicits {

implicit val executionContext: ExecutionContext = module.executionContext

}

/** Makes [[java.util.concurrent.ThreadPoolExecutor]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */
def makeThreadPoolExecutor(config: ThreadPoolExecutorConfig, threadFactory: ThreadFactory): Resource[F, ThreadPoolExecutor] = {
ExecutorModule.makeThreadPoolExecutor(config, threadFactory, new LinkedBlockingQueue)
}

/** Makes [[java.util.concurrent.ForkJoinPool]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */
def makeForkJoinPool(config: ForkJoinPoolConfig, threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ForkJoinPool] = {
ExecutorModule.makeForkJoinPool(config, numOfCpus, threadFactory)
}

}

object ExecutorModule {

/** Makes [[com.avast.server.toolkit.execution.ExecutorModule]] with default callback executor and extra [[cats.effect.Blocker]] executor
* for blocking operations.
*/
def makeDefault[F[_]: Sync]: Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
coreSize = numOfCpus * 2
executor <- makeThreadPoolExecutor(ThreadPoolExecutorConfig(coreSize, coreSize), toolkitThreadFactory, new LinkedBlockingQueue)
.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
}

/** Makes [[com.avast.server.toolkit.execution.ExecutorModule]] with the provided callback executor and extra [[cats.effect.Blocker]]
* executor for blocking operations.
*/
def makeFromExecutionContext[F[_]: Sync](executor: ExecutionContext): Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
}

/** Makes [[com.avast.server.toolkit.execution.ExecutorModule]] with executor and extra [[cats.effect.Blocker]] executor
* for blocking operations.
*/
def makeFromConfig[F[_]: Sync](executorConfig: ThreadPoolExecutorConfig): Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
executor <- makeThreadPoolExecutor(executorConfig, toolkitThreadFactory, new LinkedBlockingQueue)
.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
}

/** Makes [[com.avast.server.toolkit.execution.ExecutorModule]] with fork-join executor and extra [[cats.effect.Blocker]] executor
* for blocking operations.
*/
def makeForkJoinFromConfig[F[_]: Sync](executorConfig: ForkJoinPoolConfig): Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
executor <- makeForkJoinPool(executorConfig, numOfCpus, toolkitThreadFactory)
.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
}

private def makeBlockingExecutor[F[_]: Sync] =
makeThreadPoolExecutor[F](
ThreadPoolExecutorConfig(0, Int.MaxValue),
new ConfigurableThreadFactory(Config(nameFormat = Some("default-blocking-%02d"), daemon = true)),
new SynchronousQueue
)

private def toolkitThreadFactory = new ConfigurableThreadFactory(Config(nameFormat = Some("default-async-%02d"), daemon = true))

private def makeThreadPoolExecutor[F[_]: Sync](config: ThreadPoolExecutorConfig,
threadFactory: ThreadFactory,
queue: BlockingQueue[Runnable]): Resource[F, ThreadPoolExecutor] = {
Resource.make {
Sync[F].delay {
val threadPool = new ThreadPoolExecutor(
config.coreSize,
config.maxSize,
config.keepAlive.toMillis,
TimeUnit.MILLISECONDS,
queue,
threadFactory
)
threadPool.allowCoreThreadTimeOut(true)

threadPool
}
}(pool => Sync[F].delay(pool.shutdown()))
}

private def makeForkJoinPool[F[_]: Sync](config: ForkJoinPoolConfig,
numOfCpus: Int,
threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ForkJoinPool] = {
Resource.make {
Sync[F].delay {
new ForkJoinPool(config.computeParallelism(numOfCpus), threadFactory, LoggingUncaughtExceptionHandler, config.computeAsyncMode)
}
}(pool => Sync[F].delay(pool.shutdown()))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.avast.server.toolkit.execution

import ForkJoinPoolConfig.TaskPeekingMode
import ForkJoinPoolConfig.TaskPeekingMode.{FIFO, LIFO}

final case class ForkJoinPoolConfig(parallelismMin: Int = 8,
parallelismFactor: Double = 1.0,
parallelismMax: Int = 64,
taskPeekingMode: TaskPeekingMode = FIFO) {

private[toolkit] def computeParallelism(numOfCpus: Int): Int = {
math.min(math.max(math.ceil(numOfCpus * parallelismFactor).toInt, parallelismMin), parallelismMax)
}

private[toolkit] def computeAsyncMode: Boolean = taskPeekingMode match {
case FIFO => true
case LIFO => false
}

}

object ForkJoinPoolConfig {

val Default: ForkJoinPoolConfig = ForkJoinPoolConfig()

sealed trait TaskPeekingMode

object TaskPeekingMode {

case object FIFO extends TaskPeekingMode

case object LIFO extends TaskPeekingMode

}

}
Loading