Skip to content

feat: Add jvm-execution, jvm-ssl and jvm-system modules #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ lazy val commonSettings = Seq(
libraryDependencies ++= Seq(
compilerPlugin(Dependencies.kindProjector),
Dependencies.catsEffect,
Dependencies.scalaTest
Dependencies.Test.scalaTest
),
Test / publishArtifact := false
)

lazy val root = (project in file("."))
.aggregate(example, pureconfig)
.aggregate(example, jvmExecution, jvmSsl, jvmSystem, pureconfig)
.settings(
name := "scala-server-toolkit",
publish / skip := true
)

lazy val example = project
.dependsOn(pureconfig)
.dependsOn(jvmExecution, jvmSsl, jvmSystem, pureconfig)
.enablePlugins(MdocPlugin)
.settings(
commonSettings,
Expand All @@ -45,6 +45,25 @@ lazy val example = project
)
)

lazy val jvmExecution = (project in file("jvm-execution"))
.settings(
commonSettings,
name := "scala-server-toolkit-jvm-execution",
libraryDependencies += Dependencies.scalaLogging
)

lazy val jvmSsl = (project in file("jvm-ssl"))
.settings(
commonSettings,
name := "scala-server-toolkit-jvm-ssl"
)

lazy val jvmSystem = (project in file("jvm-system"))
.settings(
commonSettings,
name := "scala-server-toolkit-jvm-system"
)

lazy val pureconfig = project
.settings(
commonSettings,
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [Getting Started](#getting-started)
* [Rationale](rationale.md)
* [Modules JVM](jvm.md)
* [Module PureConfig](pureconfig.md)

## Getting Started
Expand Down
28 changes: 28 additions & 0 deletions docs/jvm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Modules JVM

![Maven Central](https://img.shields.io/maven-central/v/com.avast/scala-server-toolkit-jvm-system_2.13)

`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"`

There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of JVM-related utilities such as standard in/out,
time, random number generation, thread pools and SSL context initialization. The following modules are available:

* `scala-server-toolkit-jvm-execution`,
* `scala-server-toolkit-jvm-ssl`,
* `scala-server-toolkit-jvm-system`.

```scala
import java.util.concurrent.TimeUnit
import com.avast.server.toolkit.system.SystemModule
import zio.interop.catz._
import zio.Task

val program = for {
systemModule <- SystemModule.make[Task]
currentTime <- systemModule.clock.realTime(TimeUnit.MILLISECONDS)
randomNumber <- systemModule.random.nextInt
_ <- systemModule.console.printLine(s"Current Unix epoch time is $currentTime ms. Random number: $randomNumber")
} yield ()
// program: zio.ZIO[Any, Throwable, Unit] = zio.ZIO$FlatMap@7f42b194
```

4 changes: 2 additions & 2 deletions docs/pureconfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import zio.Task
final case class ServerConfiguration(listenAddress: String, listenPort: Int)

implicit val serverConfigurationReader: ConfigReader[ServerConfiguration] = deriveReader
// serverConfigurationReader: ConfigReader[ServerConfiguration] = pureconfig.generic.DerivedConfigReader1$$anon$3@3c04ddda
// serverConfigurationReader: ConfigReader[ServerConfiguration] = pureconfig.generic.DerivedConfigReader1$$anon$3@1a9e2c3b

val maybeConfiguration = PureConfigModule.make[Task, ServerConfiguration]
// maybeConfiguration: Task[Either[cats.data.NonEmptyList[String], ServerConfiguration]] = zio.ZIO$EffectPartial@47b494ce
// maybeConfiguration: Task[Either[cats.data.NonEmptyList[String], ServerConfiguration]] = zio.ZIO$EffectPartial@5f92e5eb
```

1 change: 1 addition & 0 deletions example/src/main/mdoc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [Getting Started](#getting-started)
* [Rationale](rationale.md)
* [Modules JVM](jvm.md)
* [Module PureConfig](pureconfig.md)

## Getting Started
Expand Down
26 changes: 26 additions & 0 deletions example/src/main/mdoc/jvm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Modules JVM

![Maven Central](https://img.shields.io/maven-central/v/com.avast/scala-server-toolkit-jvm-system_2.13)

`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"`

There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of JVM-related utilities such as standard in/out,
time, random number generation, thread pools and SSL context initialization. The following modules are available:

* `scala-server-toolkit-jvm-execution`,
* `scala-server-toolkit-jvm-ssl`,
* `scala-server-toolkit-jvm-system`.

```scala mdoc
import java.util.concurrent.TimeUnit
import com.avast.server.toolkit.system.SystemModule
import zio.interop.catz._
import zio.Task

val program = for {
systemModule <- SystemModule.make[Task]
currentTime <- systemModule.clock.realTime(TimeUnit.MILLISECONDS)
randomNumber <- systemModule.random.nextInt
_ <- systemModule.console.printLine(s"Current Unix epoch time is $currentTime ms. Random number: $randomNumber")
} yield ()
```
12 changes: 12 additions & 0 deletions example/src/main/scala/com/avast/server/toolkit/example/Main.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package com.avast.server.toolkit.example

import java.util.concurrent.TimeUnit

import cats.effect.Resource
import com.avast.server.toolkit.example.config.Configuration
import com.avast.server.toolkit.execution.ExecutorModule
import com.avast.server.toolkit.pureconfig.PureConfigModule
import com.avast.server.toolkit.system.SystemModule
import zio.interop.catz._
import zio.{Task, ZIO}

Expand All @@ -11,6 +15,14 @@ object Main extends CatsApp {
def program: Resource[Task, Unit] = {
for {
configuration <- Resource.liftF(PureConfigModule.makeOrRaise[Task, Configuration])
systemModule <- Resource.liftF(SystemModule.make[Task])
executorModule <- ExecutorModule.make[Task](runtime.Platform.executor.asEC)
currentTime <- Resource.liftF(systemModule.clock.realTime(TimeUnit.MILLISECONDS))
_ <- Resource.liftF(
systemModule
.console
.printLine(s"The current Unix epoch time is $currentTime. This system has ${executorModule.numOfCpus} CPUs.")
)
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.avast.server.toolkit.execution

import java.lang.Thread.UncaughtExceptionHandler
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread, ThreadFactory}

import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config

/** Thread factory (both [[java.util.concurrent.ThreadFactory]] and [[java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory]])
* that creates new threads according to the provided [[com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config]].
*/
class ConfigurableThreadFactory(config: Config) extends ThreadFactory with ForkJoinWorkerThreadFactory {

private val counter = new AtomicLong(0L)

override def newThread(r: Runnable): Thread = configure(new Thread(r))

override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
configure(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool))
}

private def configure[A <: Thread](thread: A) = {
config.nameFormat.map(_.format(counter.getAndIncrement())).foreach(thread.setName)
thread.setDaemon(config.daemon)
thread.setPriority(config.priority)
thread.setUncaughtExceptionHandler(config.uncaughtExceptionHandler)
thread
}

}

object ConfigurableThreadFactory {

/**
* @param nameFormat Formatted with long number, e.g. my-thread-%02d
*/
final case class Config(nameFormat: Option[String] = None,
daemon: Boolean = false,
priority: Int = Thread.NORM_PRIORITY,
uncaughtExceptionHandler: UncaughtExceptionHandler = LoggingUncaughtExceptionHandler)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.avast.server.toolkit.execution

import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
import java.util.concurrent.{
BlockingQueue,
ExecutorService,
ForkJoinPool,
LinkedBlockingQueue,
SynchronousQueue,
ThreadFactory,
ThreadPoolExecutor,
TimeUnit
}

import cats.effect.{Blocker, Resource, Sync}
import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config

import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.language.higherKinds

/** Provides necessary executors - the default one for execution of your business logic and callbacks and special one designated for
* blocking operations. Also allows you to create more executors if you need them.
*/
class ExecutorModule[F[_]: Sync](val numOfCpus: Int, val executor: ExecutionContext, blockingExecutor: ExecutionContextExecutorService) {
module =>

/** Executor designated for blocking operations. */
val blocker: Blocker = Blocker.liftExecutionContext(blockingExecutor)

/** [[java.util.concurrent.ExecutorService]] that can be used for blocking operations in Java-interop code. */
lazy val blockingExecutorService: ExecutorService = blockingExecutor

/** Provides implicit reference to the default callback executor for easier use. */
object Implicits {

implicit val executor: ExecutionContext = module.executor

}

/** Makes [[java.util.concurrent.ThreadPoolExecutor]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */
def makeThreadPoolExecutor(config: ThreadPoolExecutorConfig,
threadFactory: ThreadFactory): Resource[F, ExecutionContextExecutorService] = {
ExecutorModule.makeThreadPoolExecutor(config, threadFactory, new LinkedBlockingQueue).map(ExecutionContext.fromExecutorService)
}

/** Makes [[java.util.concurrent.ForkJoinPool]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */
def makeForkJoinExecutor(config: ForkJoinPoolConfig,
threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ExecutionContextExecutorService] = {
ExecutorModule.makeForkJoinPool(config, numOfCpus, threadFactory).map(ExecutionContext.fromExecutorService)
}

}

object ExecutorModule {

/** Makes [[ExecutorModule]] with default callback executor and extra [[cats.effect.Blocker]] executor for blocking operations. */
def make[F[_]: Sync]: Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
coreSize = numOfCpus * 2
executor <- makeThreadPoolExecutor(ThreadPoolExecutorConfig(coreSize, coreSize), toolkitThreadFactory, new LinkedBlockingQueue)
.map(ExecutionContext.fromExecutorService)
be <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, be)
}

/** Makes [[ExecutorModule]] with the provided callback executor and extra [[cats.effect.Blocker]] executor for blocking operations. */
def make[F[_]: Sync](executor: ExecutionContext): Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
be <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, be)
}

/** Makes [[ExecutorModule]] with executor and extra [[cats.effect.Blocker]] executor for blocking operations. */
def make[F[_]: Sync](executorConfig: ThreadPoolExecutorConfig): Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
executor <- makeThreadPoolExecutor(executorConfig, toolkitThreadFactory, new LinkedBlockingQueue)
.map(ExecutionContext.fromExecutorService)
be <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, be)
}

/** Makes [[ExecutorModule]] with fork-join executor and extra [[cats.effect.Blocker]] executor for blocking operations. */
def makeWithForkJoin[F[_]: Sync](executorConfig: ForkJoinPoolConfig): Resource[F, ExecutorModule[F]] = {
for {
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors))
executor <- makeForkJoinPool(executorConfig, numOfCpus, toolkitThreadFactory)
.map(ExecutionContext.fromExecutorService)
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService)
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor)
}

private def makeBlockingExecutor[F[_]: Sync] =
makeThreadPoolExecutor[F](
ThreadPoolExecutorConfig(0, Int.MaxValue),
new ConfigurableThreadFactory(Config(nameFormat = Some("default-blocking-%02d"), daemon = true)),
new SynchronousQueue
)

private def toolkitThreadFactory = new ConfigurableThreadFactory(Config(nameFormat = Some("default-async-%02d"), daemon = true))

private def makeThreadPoolExecutor[F[_]: Sync](config: ThreadPoolExecutorConfig,
threadFactory: ThreadFactory,
queue: BlockingQueue[Runnable]): Resource[F, ThreadPoolExecutor] = {
Resource.make {
Sync[F].delay {
val threadPool = new ThreadPoolExecutor(
config.coreSize,
config.maxSize,
config.keepAlive.toMillis,
TimeUnit.MILLISECONDS,
queue,
threadFactory
)
threadPool.allowCoreThreadTimeOut(true)

threadPool
}
}(pool => Sync[F].delay(pool.shutdown()))
}

private def makeForkJoinPool[F[_]: Sync](config: ForkJoinPoolConfig,
numOfCpus: Int,
threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ForkJoinPool] = {
Resource.make {
Sync[F].delay {
new ForkJoinPool(config.computeParallelism(numOfCpus), threadFactory, LoggingUncaughtExceptionHandler, config.computeAsyncMode)
}
}(pool => Sync[F].delay(pool.shutdown()))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.avast.server.toolkit.execution

import ForkJoinPoolConfig.TaskPeekingMode
import ForkJoinPoolConfig.TaskPeekingMode.{FIFO, LIFO}

final case class ForkJoinPoolConfig(parallelismMin: Int = 8,
parallelismFactor: Double = 1.0,
parallelismMax: Int = 64,
taskPeekingMode: TaskPeekingMode = FIFO) {

private[toolkit] def computeParallelism(numOfCpus: Int): Int = {
math.min(math.max(math.ceil(numOfCpus * parallelismFactor).toInt, parallelismMin), parallelismMax)
}

private[toolkit] def computeAsyncMode: Boolean = taskPeekingMode match {
case FIFO => true
case LIFO => false
}

}

object ForkJoinPoolConfig {

val Default: ForkJoinPoolConfig = ForkJoinPoolConfig()

sealed trait TaskPeekingMode

object TaskPeekingMode {

case object FIFO extends TaskPeekingMode

case object LIFO extends TaskPeekingMode

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.avast.server.toolkit.execution

import java.lang.Thread.UncaughtExceptionHandler

import com.typesafe.scalalogging.LazyLogging

object LoggingUncaughtExceptionHandler extends UncaughtExceptionHandler with LazyLogging {

override def uncaughtException(t: Thread, ex: Throwable): Unit = logger.error(s"Uncaught exception on thread ${t.getName}", ex)

}
Loading