-
Notifications
You must be signed in to change notification settings - Fork 21
feat: Add jvm-execution, jvm-ssl and jvm-system modules #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Modules JVM | ||
|
||
 | ||
|
||
`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"` | ||
|
||
There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of JVM-related utilities such as standard in/out, | ||
time, random number generation, thread pools and SSL context initialization. The following modules are available: | ||
jakubjanecek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
* `scala-server-toolkit-jvm-execution`, | ||
* `scala-server-toolkit-jvm-ssl`, | ||
* `scala-server-toolkit-jvm-system`. | ||
|
||
```scala | ||
import java.util.concurrent.TimeUnit | ||
import com.avast.server.toolkit.system.SystemModule | ||
import zio.interop.catz._ | ||
import zio.Task | ||
|
||
val program = for { | ||
systemModule <- SystemModule.make[Task] | ||
currentTime <- systemModule.clock.realTime(TimeUnit.MILLISECONDS) | ||
randomNumber <- systemModule.random.nextInt | ||
_ <- systemModule.console.printLine(s"Current Unix epoch time is $currentTime ms. Random number: $randomNumber") | ||
} yield () | ||
// program: zio.ZIO[Any, Throwable, Unit] = zio.ZIO$FlatMap@7f42b194 | ||
``` | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Modules JVM | ||
|
||
 | ||
|
||
`libraryDependencies += "com.avast" %% "scala-server-toolkit-jvm-system" % "<VERSION>"` | ||
|
||
There is a set of `scala-server-toolkit-jvm-*` modules that provide pure implementations of JVM-related utilities such as standard in/out, | ||
time, random number generation, thread pools and SSL context initialization. The following modules are available: | ||
|
||
* `scala-server-toolkit-jvm-execution`, | ||
* `scala-server-toolkit-jvm-ssl`, | ||
* `scala-server-toolkit-jvm-system`. | ||
|
||
```scala mdoc | ||
import java.util.concurrent.TimeUnit | ||
import com.avast.server.toolkit.system.SystemModule | ||
import zio.interop.catz._ | ||
import zio.Task | ||
|
||
val program = for { | ||
systemModule <- SystemModule.make[Task] | ||
currentTime <- systemModule.clock.realTime(TimeUnit.MILLISECONDS) | ||
randomNumber <- systemModule.random.nextInt | ||
_ <- systemModule.console.printLine(s"Current Unix epoch time is $currentTime ms. Random number: $randomNumber") | ||
} yield () | ||
``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
43 changes: 43 additions & 0 deletions
43
...ecution/src/main/scala/com/avast/server/toolkit/execution/ConfigurableThreadFactory.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package com.avast.server.toolkit.execution | ||
|
||
import java.lang.Thread.UncaughtExceptionHandler | ||
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory | ||
import java.util.concurrent.atomic.AtomicLong | ||
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread, ThreadFactory} | ||
|
||
import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config | ||
|
||
/** Thread factory (both [[java.util.concurrent.ThreadFactory]] and [[java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory]]) | ||
* that creates new threads according to the provided [[com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config]]. | ||
*/ | ||
class ConfigurableThreadFactory(config: Config) extends ThreadFactory with ForkJoinWorkerThreadFactory { | ||
|
||
private val counter = new AtomicLong(0L) | ||
|
||
override def newThread(r: Runnable): Thread = configure(new Thread(r)) | ||
|
||
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { | ||
configure(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool)) | ||
} | ||
|
||
private def configure[A <: Thread](thread: A) = { | ||
config.nameFormat.map(_.format(counter.getAndIncrement())).foreach(thread.setName) | ||
thread.setDaemon(config.daemon) | ||
thread.setPriority(config.priority) | ||
thread.setUncaughtExceptionHandler(config.uncaughtExceptionHandler) | ||
thread | ||
} | ||
|
||
} | ||
|
||
object ConfigurableThreadFactory { | ||
|
||
/** | ||
* @param nameFormat Formatted with long number, e.g. my-thread-%02d | ||
*/ | ||
final case class Config(nameFormat: Option[String] = None, | ||
daemon: Boolean = false, | ||
priority: Int = Thread.NORM_PRIORITY, | ||
uncaughtExceptionHandler: UncaughtExceptionHandler = LoggingUncaughtExceptionHandler) | ||
|
||
} |
134 changes: 134 additions & 0 deletions
134
jvm-execution/src/main/scala/com/avast/server/toolkit/execution/ExecutorModule.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
package com.avast.server.toolkit.execution | ||
|
||
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory | ||
import java.util.concurrent.{ | ||
BlockingQueue, | ||
ExecutorService, | ||
ForkJoinPool, | ||
LinkedBlockingQueue, | ||
SynchronousQueue, | ||
ThreadFactory, | ||
ThreadPoolExecutor, | ||
TimeUnit | ||
} | ||
|
||
import cats.effect.{Blocker, Resource, Sync} | ||
import com.avast.server.toolkit.execution.ConfigurableThreadFactory.Config | ||
|
||
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} | ||
import scala.language.higherKinds | ||
|
||
/** Provides necessary executors - the default one for execution of your business logic and callbacks and special one designated for | ||
* blocking operations. Also allows you to create more executors if you need them. | ||
*/ | ||
class ExecutorModule[F[_]: Sync](val numOfCpus: Int, val executor: ExecutionContext, blockingExecutor: ExecutionContextExecutorService) { | ||
jakubjanecek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
module => | ||
|
||
/** Executor designated for blocking operations. */ | ||
val blocker: Blocker = Blocker.liftExecutionContext(blockingExecutor) | ||
|
||
/** [[java.util.concurrent.ExecutorService]] that can be used for blocking operations in Java-interop code. */ | ||
lazy val blockingExecutorService: ExecutorService = blockingExecutor | ||
jakubjanecek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** Provides implicit reference to the default callback executor for easier use. */ | ||
object Implicits { | ||
|
||
implicit val executor: ExecutionContext = module.executor | ||
|
||
} | ||
|
||
/** Makes [[java.util.concurrent.ThreadPoolExecutor]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */ | ||
def makeThreadPoolExecutor(config: ThreadPoolExecutorConfig, | ||
jakubjanecek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
threadFactory: ThreadFactory): Resource[F, ExecutionContextExecutorService] = { | ||
ExecutorModule.makeThreadPoolExecutor(config, threadFactory, new LinkedBlockingQueue).map(ExecutionContext.fromExecutorService) | ||
} | ||
|
||
/** Makes [[java.util.concurrent.ForkJoinPool]] according to the given config and with [[java.util.concurrent.ThreadFactory]]. */ | ||
def makeForkJoinExecutor(config: ForkJoinPoolConfig, | ||
threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ExecutionContextExecutorService] = { | ||
ExecutorModule.makeForkJoinPool(config, numOfCpus, threadFactory).map(ExecutionContext.fromExecutorService) | ||
} | ||
|
||
} | ||
|
||
object ExecutorModule { | ||
|
||
/** Makes [[ExecutorModule]] with default callback executor and extra [[cats.effect.Blocker]] executor for blocking operations. */ | ||
def make[F[_]: Sync]: Resource[F, ExecutorModule[F]] = { | ||
for { | ||
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors)) | ||
coreSize = numOfCpus * 2 | ||
executor <- makeThreadPoolExecutor(ThreadPoolExecutorConfig(coreSize, coreSize), toolkitThreadFactory, new LinkedBlockingQueue) | ||
.map(ExecutionContext.fromExecutorService) | ||
be <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService) | ||
} yield new ExecutorModule[F](numOfCpus, executor, be) | ||
} | ||
|
||
/** Makes [[ExecutorModule]] with the provided callback executor and extra [[cats.effect.Blocker]] executor for blocking operations. */ | ||
def make[F[_]: Sync](executor: ExecutionContext): Resource[F, ExecutorModule[F]] = { | ||
jakubjanecek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for { | ||
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors)) | ||
be <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService) | ||
} yield new ExecutorModule[F](numOfCpus, executor, be) | ||
} | ||
|
||
/** Makes [[ExecutorModule]] with executor and extra [[cats.effect.Blocker]] executor for blocking operations. */ | ||
def make[F[_]: Sync](executorConfig: ThreadPoolExecutorConfig): Resource[F, ExecutorModule[F]] = { | ||
for { | ||
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors)) | ||
executor <- makeThreadPoolExecutor(executorConfig, toolkitThreadFactory, new LinkedBlockingQueue) | ||
.map(ExecutionContext.fromExecutorService) | ||
be <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService) | ||
} yield new ExecutorModule[F](numOfCpus, executor, be) | ||
} | ||
|
||
/** Makes [[ExecutorModule]] with fork-join executor and extra [[cats.effect.Blocker]] executor for blocking operations. */ | ||
def makeWithForkJoin[F[_]: Sync](executorConfig: ForkJoinPoolConfig): Resource[F, ExecutorModule[F]] = { | ||
for { | ||
numOfCpus <- Resource.liftF(Sync[F].delay(Runtime.getRuntime.availableProcessors)) | ||
executor <- makeForkJoinPool(executorConfig, numOfCpus, toolkitThreadFactory) | ||
.map(ExecutionContext.fromExecutorService) | ||
blockingExecutor <- makeBlockingExecutor.map(ExecutionContext.fromExecutorService) | ||
} yield new ExecutorModule[F](numOfCpus, executor, blockingExecutor) | ||
} | ||
|
||
private def makeBlockingExecutor[F[_]: Sync] = | ||
makeThreadPoolExecutor[F]( | ||
ThreadPoolExecutorConfig(0, Int.MaxValue), | ||
new ConfigurableThreadFactory(Config(nameFormat = Some("default-blocking-%02d"), daemon = true)), | ||
new SynchronousQueue | ||
) | ||
|
||
private def toolkitThreadFactory = new ConfigurableThreadFactory(Config(nameFormat = Some("default-async-%02d"), daemon = true)) | ||
|
||
private def makeThreadPoolExecutor[F[_]: Sync](config: ThreadPoolExecutorConfig, | ||
threadFactory: ThreadFactory, | ||
queue: BlockingQueue[Runnable]): Resource[F, ThreadPoolExecutor] = { | ||
Resource.make { | ||
Sync[F].delay { | ||
val threadPool = new ThreadPoolExecutor( | ||
config.coreSize, | ||
config.maxSize, | ||
config.keepAlive.toMillis, | ||
TimeUnit.MILLISECONDS, | ||
queue, | ||
threadFactory | ||
) | ||
threadPool.allowCoreThreadTimeOut(true) | ||
|
||
threadPool | ||
} | ||
}(pool => Sync[F].delay(pool.shutdown())) | ||
} | ||
|
||
private def makeForkJoinPool[F[_]: Sync](config: ForkJoinPoolConfig, | ||
numOfCpus: Int, | ||
threadFactory: ForkJoinWorkerThreadFactory): Resource[F, ForkJoinPool] = { | ||
Resource.make { | ||
Sync[F].delay { | ||
new ForkJoinPool(config.computeParallelism(numOfCpus), threadFactory, LoggingUncaughtExceptionHandler, config.computeAsyncMode) | ||
} | ||
}(pool => Sync[F].delay(pool.shutdown())) | ||
} | ||
|
||
} |
36 changes: 36 additions & 0 deletions
36
jvm-execution/src/main/scala/com/avast/server/toolkit/execution/ForkJoinPoolConfig.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package com.avast.server.toolkit.execution | ||
|
||
import ForkJoinPoolConfig.TaskPeekingMode | ||
import ForkJoinPoolConfig.TaskPeekingMode.{FIFO, LIFO} | ||
|
||
final case class ForkJoinPoolConfig(parallelismMin: Int = 8, | ||
parallelismFactor: Double = 1.0, | ||
parallelismMax: Int = 64, | ||
taskPeekingMode: TaskPeekingMode = FIFO) { | ||
|
||
private[toolkit] def computeParallelism(numOfCpus: Int): Int = { | ||
math.min(math.max(math.ceil(numOfCpus * parallelismFactor).toInt, parallelismMin), parallelismMax) | ||
} | ||
|
||
private[toolkit] def computeAsyncMode: Boolean = taskPeekingMode match { | ||
case FIFO => true | ||
case LIFO => false | ||
} | ||
|
||
} | ||
|
||
object ForkJoinPoolConfig { | ||
|
||
val Default: ForkJoinPoolConfig = ForkJoinPoolConfig() | ||
|
||
sealed trait TaskPeekingMode | ||
|
||
object TaskPeekingMode { | ||
|
||
case object FIFO extends TaskPeekingMode | ||
|
||
case object LIFO extends TaskPeekingMode | ||
|
||
} | ||
|
||
} |
11 changes: 11 additions & 0 deletions
11
...n/src/main/scala/com/avast/server/toolkit/execution/LoggingUncaughtExceptionHandler.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package com.avast.server.toolkit.execution | ||
|
||
import java.lang.Thread.UncaughtExceptionHandler | ||
|
||
import com.typesafe.scalalogging.LazyLogging | ||
|
||
object LoggingUncaughtExceptionHandler extends UncaughtExceptionHandler with LazyLogging { | ||
|
||
override def uncaughtException(t: Thread, ex: Throwable): Unit = logger.error(s"Uncaught exception on thread ${t.getName}", ex) | ||
jakubjanecek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.