Skip to content

Commit 4496c5d

Browse files
committed
Collection of updates to SIP-14 (scala.concurrent)
Developed by Viktor Klang and Havoc Pennington - add Promise.isCompleted - add Future.successful and Future.failed - add ExecutionContextExecutor and ExecutionContextExecutorService for Java interop - remove defaultExecutionContext as default parameter value from promise and future - add ExecutionContext.Implicits.global which must be explicitly imported, rather than the previous always-available value for the implicit EC - remove currentExecutionContext, since it could create bugs by being out of sync with the implicit ExecutionContext - remove Future task batching (_taskStack) and Future.releaseStack This optimization should instead be implemented either in a specific thread pool or in a specific ExecutionContext. Some pools or ExecutionContexts may not want or need it. In this patch, the defaultExecutionContext does not keep the batching optimization. Whether it should have it should perhaps be determined through benchmarking. - move internalBlockingCall to BlockContext and remove currentExecutionContext In this patch, BlockContext must be implemented by Thread.currentThread, so the thread pool is the only place you can add custom hooks to be run when blocking. We implement BlockContext for the default ForkJoinWorkerThread in terms of ForkJoinPool.ManagedBlocker. - add public BlockContext.current and BlockContext.withBlockContext These allow an ExecutionContext or other code to override the BlockContext for the current thread. With this API, the BlockContext is customizable without creating a new pool of threads. BlockContext.current is needed to obtain the previous BlockContext before you push, so you can "chain up" to it if desired. BlockContext.withBlockContext is used to override the context for a given piece of code. - move isFutureThrowable into impl.Future - add implicitNotFound to ExecutionContext - remove default global EC from future {} and promise {} - add ExecutionContext.global for explicit use of the global default EC, replaces defaultExecutionContext - add a timeout to scala-concurrent-tck tests that block on SyncVar (so tests time out rather than hang) - insert blocking{} calls into concurrent tck to fix deadlocking - add NonFatal.apply and tests for NonFatal - add OnCompleteRunnable marker trait This would allow an ExecutionContext to distinguish a Runnable originating from Future.onComplete (all callbacks on Future end up going through onComplete). - rename ListenerRunnable to CallbackRunnable and use for KeptPromise too Just adds some clarity and consistency.
1 parent e9afc22 commit 4496c5d

15 files changed

+478
-321
lines changed

src/library/scala/collection/parallel/TaskSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
4848
* By default, parallel collections are parametrized with this task support object, so parallel collections
4949
* share the same execution context backend as the rest of the `scala.concurrent` package.
5050
*/
51-
class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.defaultExecutionContext)
51+
class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
5252
extends TaskSupport with ExecutionContextTasks
5353

5454

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/* __ *\
2+
** ________ ___ / / ___ Scala API **
3+
** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
4+
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
5+
** /____/\___/_/ |_/____/_/ | | **
6+
** |/ **
7+
\* */
8+
9+
package scala.concurrent
10+
11+
import java.lang.Thread
12+
import scala.concurrent.util.Duration
13+
14+
/**
15+
* A context to be notified by `scala.concurrent.blocking()` when
16+
* a thread is about to block. In effect this trait provides
17+
* the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()`
18+
* locates an instance of `BlockContext` by first looking for one
19+
* provided through `BlockContext.withBlockContext()` and failing that,
20+
* checking whether `Thread.currentThread` is an instance of `BlockContext`.
21+
* So a thread pool can have its `java.lang.Thread` instances implement
22+
* `BlockContext`. There's a default `BlockContext` used if the thread
23+
* doesn't implement `BlockContext`.
24+
*
25+
* Typically, you'll want to chain to the previous `BlockContext`,
26+
* like this:
27+
* {{{
28+
* val oldContext = BlockContext.current
29+
* val myContext = new BlockContext {
30+
* override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
31+
* // you'd have code here doing whatever you need to do
32+
* // when the thread is about to block.
33+
* // Then you'd chain to the previous context:
34+
* oldContext.internalBlockingCall(awaitable, atMost)
35+
* }
36+
* }
37+
* BlockContext.withBlockContext(myContext) {
38+
* // then this block runs with myContext as the handler
39+
* // for scala.concurrent.blocking
40+
* }
41+
* }}}
42+
*/
43+
trait BlockContext {
44+
45+
/** Used internally by the framework; blocks execution for at most
46+
* `atMost` time while waiting for an `awaitable` object to become ready.
47+
*
48+
* Clients should use `scala.concurrent.blocking` instead; this is
49+
* the implementation of `scala.concurrent.blocking`, generally
50+
* provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`.
51+
*/
52+
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
53+
}
54+
55+
object BlockContext {
56+
private object DefaultBlockContext extends BlockContext {
57+
override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
58+
awaitable.result(atMost)(Await.canAwaitEvidence)
59+
}
60+
61+
private val contextLocal = new ThreadLocal[BlockContext]() {
62+
override def initialValue = Thread.currentThread match {
63+
case ctx: BlockContext => ctx
64+
case _ => DefaultBlockContext
65+
}
66+
}
67+
68+
/** Obtain the current thread's current `BlockContext`. */
69+
def current: BlockContext = contextLocal.get
70+
71+
/** Pushes a current `BlockContext` while executing `body`. */
72+
def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
73+
val old = contextLocal.get
74+
try {
75+
contextLocal.set(blockContext)
76+
body
77+
} finally {
78+
contextLocal.set(old)
79+
}
80+
}
81+
}

src/library/scala/concurrent/ConcurrentPackageObject.scala

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,6 @@ import language.implicitConversions
1717
/** This package object contains primitives for concurrent and parallel programming.
1818
*/
1919
abstract class ConcurrentPackageObject {
20-
/** A global execution environment for executing lightweight tasks.
21-
*/
22-
lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
23-
24-
val currentExecutionContext = new ThreadLocal[ExecutionContext]
25-
26-
val handledFutureException: PartialFunction[Throwable, Throwable] = {
27-
case t: Throwable if isFutureThrowable(t) => t
28-
}
29-
30-
// TODO rename appropriately and make public
31-
private[concurrent] def isFutureThrowable(t: Throwable) = t match {
32-
case e: Error => false
33-
case t: scala.util.control.ControlThrowable => false
34-
case i: InterruptedException => false
35-
case _ => true
36-
}
3720

3821
/* concurrency constructs */
3922

@@ -46,17 +29,15 @@ abstract class ConcurrentPackageObject {
4629
* @param execctx the execution context on which the future is run
4730
* @return the `Future` holding the result of the computation
4831
*/
49-
def future[T](body: =>T)(implicit execctx: ExecutionContext = defaultExecutionContext): Future[T] =
50-
Future[T](body)
32+
def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)
5133

5234
/** Creates a promise object which can be completed with a value.
5335
*
5436
* @tparam T the type of the value in the promise
5537
* @param execctx the execution context on which the promise is created on
5638
* @return the newly created `Promise` object
5739
*/
58-
def promise[T]()(implicit execctx: ExecutionContext = defaultExecutionContext): Promise[T] =
59-
Promise[T]()
40+
def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
6041

6142
/** Used to block on a piece of code which potentially blocks.
6243
*
@@ -67,8 +48,7 @@ abstract class ConcurrentPackageObject {
6748
* - InterruptedException - in the case that a wait within the blockable object was interrupted
6849
* - TimeoutException - in the case that the blockable object timed out
6950
*/
70-
def blocking[T](body: =>T): T =
71-
blocking(impl.Future.body2awaitable(body), Duration.Inf)
51+
def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
7252

7353
/** Blocks on an awaitable object.
7454
*
@@ -79,12 +59,8 @@ abstract class ConcurrentPackageObject {
7959
* - InterruptedException - in the case that a wait within the blockable object was interrupted
8060
* - TimeoutException - in the case that the blockable object timed out
8161
*/
82-
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
83-
currentExecutionContext.get match {
84-
case null => awaitable.result(atMost)(Await.canAwaitEvidence)
85-
case ec => ec.internalBlockingCall(awaitable, atMost)
86-
}
87-
}
62+
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
63+
BlockContext.current.internalBlockingCall(awaitable, atMost)
8864

8965
@inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x)
9066
}

src/library/scala/concurrent/DelayedLazyVal.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ package scala.concurrent
2323
* @author Paul Phillips
2424
* @version 2.8
2525
*/
26-
class DelayedLazyVal[T](f: () => T, body: => Unit) {
26+
class DelayedLazyVal[T](f: () => T, body: => Unit){
2727
@volatile private[this] var _isDone = false
2828
private[this] lazy val complete = f()
2929

@@ -39,7 +39,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
3939
*/
4040
def apply(): T = if (isDone) complete else f()
4141

42-
// TODO replace with scala.concurrent.future { ... }
42+
// FIXME need to take ExecutionContext in constructor
43+
import ExecutionContext.Implicits.global
4344
future {
4445
body
4546
_isDone = true

src/library/scala/concurrent/ExecutionContext.scala

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,58 +9,80 @@
99
package scala.concurrent
1010

1111

12-
13-
import java.util.concurrent.atomic.{ AtomicInteger }
14-
import java.util.concurrent.{ Executors, Future => JFuture, Callable, ExecutorService, Executor }
12+
import java.util.concurrent.{ ExecutorService, Executor }
1513
import scala.concurrent.util.Duration
16-
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
17-
import scala.collection.generic.CanBuildFrom
18-
import collection._
19-
20-
14+
import scala.annotation.implicitNotFound
2115

16+
/**
17+
* An `ExecutionContext` is an abstraction over an entity that can execute program logic.
18+
*/
19+
@implicitNotFound("Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global")
2220
trait ExecutionContext {
2321

2422
/** Runs a block of code on this execution context.
2523
*/
2624
def execute(runnable: Runnable): Unit
2725

28-
/** Used internally by the framework - blocks execution for at most `atMost` time while waiting
29-
* for an `awaitable` object to become ready.
30-
*
31-
* Clients should use `scala.concurrent.blocking` instead.
32-
*/
33-
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
34-
3526
/** Reports that an asynchronous computation failed.
3627
*/
3728
def reportFailure(t: Throwable): Unit
3829

3930
}
4031

32+
/**
33+
* Union interface since Java does not support union types
34+
*/
35+
trait ExecutionContextExecutor extends ExecutionContext with Executor
36+
37+
/**
38+
* Union interface since Java does not support union types
39+
*/
40+
trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService
41+
4142

4243
/** Contains factory methods for creating execution contexts.
4344
*/
4445
object ExecutionContext {
45-
46-
implicit def defaultExecutionContext: ExecutionContext = scala.concurrent.defaultExecutionContext
47-
46+
/**
47+
* The `ExecutionContext` associated with the current `Thread`
48+
*/
49+
val currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal //FIXME might want to set the initial value to an executionContext that throws an exception on execute and warns that it's not set
50+
51+
/**
52+
* This is the explicit global ExecutionContext,
53+
* call this when you want to provide the global ExecutionContext explicitly
54+
*/
55+
def global: ExecutionContextExecutor = Implicits.global
56+
57+
object Implicits {
58+
/**
59+
* This is the implicit global ExecutionContext,
60+
* import this when you want to provide the global ExecutionContext implicitly
61+
*/
62+
implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
63+
}
64+
4865
/** Creates an `ExecutionContext` from the given `ExecutorService`.
4966
*/
50-
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService =
67+
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
5168
impl.ExecutionContextImpl.fromExecutorService(e, reporter)
69+
70+
/** Creates an `ExecutionContext` from the given `ExecutorService` with the default Reporter.
71+
*/
72+
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)
5273

5374
/** Creates an `ExecutionContext` from the given `Executor`.
5475
*/
55-
def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor =
76+
def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
5677
impl.ExecutionContextImpl.fromExecutor(e, reporter)
78+
79+
/** Creates an `ExecutionContext` from the given `Executor` with the default Reporter.
80+
*/
81+
def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)
5782

58-
def defaultReporter: Throwable => Unit = {
59-
// re-throwing `Error`s here causes an exception handling test to fail.
60-
//case e: Error => throw e
61-
case t => t.printStackTrace()
62-
}
63-
83+
/** The default reporter simply prints the stack trace of the `Throwable` to System.err.
84+
*/
85+
def defaultReporter: Throwable => Unit = { case t => t.printStackTrace() }
6486
}
6587

6688

src/library/scala/concurrent/Future.scala

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
package scala.concurrent
1010

11-
11+
import language.higherKinds
1212

1313
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
1414
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS MILLIS }
@@ -23,11 +23,9 @@ import scala.Option
2323
import scala.util.{Try, Success, Failure}
2424

2525
import scala.annotation.tailrec
26-
import scala.collection.mutable.Stack
2726
import scala.collection.mutable.Builder
2827
import scala.collection.generic.CanBuildFrom
2928
import scala.reflect.ClassTag
30-
import language.higherKinds
3129

3230

3331

@@ -138,7 +136,7 @@ trait Future[+T] extends Awaitable[T] {
138136
* $callbackInContext
139137
*/
140138
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
141-
case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
139+
case Left(t) if (impl.Future.isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
142140
case _ =>
143141
}(executor)
144142

@@ -580,6 +578,20 @@ object Future {
580578
classOf[Double] -> classOf[jl.Double],
581579
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
582580
)
581+
582+
/** Creates an already completed Future with the specified exception.
583+
*
584+
* @tparam T the type of the value in the future
585+
* @return the newly created `Future` object
586+
*/
587+
def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
588+
589+
/** Creates an already completed Future with the specified result.
590+
*
591+
* @tparam T the type of the value in the future
592+
* @return the newly created `Future` object
593+
*/
594+
def successful[T](result: T): Future[T] = Promise.successful(result).future
583595

584596
/** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
585597
*
@@ -710,5 +722,12 @@ object Future {
710722
}
711723
}
712724

713-
725+
/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext`
726+
* wraps a callback provided to `Future.onComplete`.
727+
* All callbacks provided to a `Future` end up going through `onComplete`, so this allows an
728+
* `ExecutionContext` to special-case callbacks that were executed by `Future` if desired.
729+
*/
730+
trait OnCompleteRunnable {
731+
self: Runnable =>
732+
}
714733

src/library/scala/concurrent/Promise.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ trait Promise[T] {
3434
*/
3535
def future: Future[T]
3636

37+
/** Returns whether the promise has already been completed with
38+
* a value or an exception.
39+
*
40+
* $nonDeterministic
41+
*
42+
* @return `true` if the promise is already completed, `false` otherwise
43+
*/
44+
def isCompleted: Boolean
45+
3746
/** Completes the promise with either an exception or a value.
3847
*
3948
* @param result Either the value or the exception to complete the promise with.

0 commit comments

Comments
 (0)