Skip to content

Commit b08de29

Browse files
author
Adriaan Moors
committed
Merge pull request scala#856 from havocp/sip14-execution-changes
Collection of updates to SIP-14 (scala.concurrent)
2 parents 9a7546d + 4496c5d commit b08de29

15 files changed

+478
-321
lines changed

src/library/scala/collection/parallel/TaskSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
4848
* By default, parallel collections are parametrized with this task support object, so parallel collections
4949
* share the same execution context backend as the rest of the `scala.concurrent` package.
5050
*/
51-
class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.defaultExecutionContext)
51+
class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
5252
extends TaskSupport with ExecutionContextTasks
5353

5454

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/* __ *\
2+
** ________ ___ / / ___ Scala API **
3+
** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
4+
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
5+
** /____/\___/_/ |_/____/_/ | | **
6+
** |/ **
7+
\* */
8+
9+
package scala.concurrent
10+
11+
import java.lang.Thread
12+
import scala.concurrent.util.Duration
13+
14+
/**
15+
* A context to be notified by `scala.concurrent.blocking()` when
16+
* a thread is about to block. In effect this trait provides
17+
* the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()`
18+
* locates an instance of `BlockContext` by first looking for one
19+
* provided through `BlockContext.withBlockContext()` and failing that,
20+
* checking whether `Thread.currentThread` is an instance of `BlockContext`.
21+
* So a thread pool can have its `java.lang.Thread` instances implement
22+
* `BlockContext`. There's a default `BlockContext` used if the thread
23+
* doesn't implement `BlockContext`.
24+
*
25+
* Typically, you'll want to chain to the previous `BlockContext`,
26+
* like this:
27+
* {{{
28+
* val oldContext = BlockContext.current
29+
* val myContext = new BlockContext {
30+
* override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
31+
* // you'd have code here doing whatever you need to do
32+
* // when the thread is about to block.
33+
* // Then you'd chain to the previous context:
34+
* oldContext.internalBlockingCall(awaitable, atMost)
35+
* }
36+
* }
37+
* BlockContext.withBlockContext(myContext) {
38+
* // then this block runs with myContext as the handler
39+
* // for scala.concurrent.blocking
40+
* }
41+
* }}}
42+
*/
43+
trait BlockContext {
44+
45+
/** Used internally by the framework; blocks execution for at most
46+
* `atMost` time while waiting for an `awaitable` object to become ready.
47+
*
48+
* Clients should use `scala.concurrent.blocking` instead; this is
49+
* the implementation of `scala.concurrent.blocking`, generally
50+
* provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`.
51+
*/
52+
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
53+
}
54+
55+
object BlockContext {
56+
private object DefaultBlockContext extends BlockContext {
57+
override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
58+
awaitable.result(atMost)(Await.canAwaitEvidence)
59+
}
60+
61+
private val contextLocal = new ThreadLocal[BlockContext]() {
62+
override def initialValue = Thread.currentThread match {
63+
case ctx: BlockContext => ctx
64+
case _ => DefaultBlockContext
65+
}
66+
}
67+
68+
/** Obtain the current thread's current `BlockContext`. */
69+
def current: BlockContext = contextLocal.get
70+
71+
/** Pushes a current `BlockContext` while executing `body`. */
72+
def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
73+
val old = contextLocal.get
74+
try {
75+
contextLocal.set(blockContext)
76+
body
77+
} finally {
78+
contextLocal.set(old)
79+
}
80+
}
81+
}

src/library/scala/concurrent/ConcurrentPackageObject.scala

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,6 @@ import language.implicitConversions
1717
/** This package object contains primitives for concurrent and parallel programming.
1818
*/
1919
abstract class ConcurrentPackageObject {
20-
/** A global execution environment for executing lightweight tasks.
21-
*/
22-
lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
23-
24-
val currentExecutionContext = new ThreadLocal[ExecutionContext]
25-
26-
val handledFutureException: PartialFunction[Throwable, Throwable] = {
27-
case t: Throwable if isFutureThrowable(t) => t
28-
}
29-
30-
// TODO rename appropriately and make public
31-
private[concurrent] def isFutureThrowable(t: Throwable) = t match {
32-
case e: Error => false
33-
case t: scala.util.control.ControlThrowable => false
34-
case i: InterruptedException => false
35-
case _ => true
36-
}
3720

3821
/* concurrency constructs */
3922

@@ -46,17 +29,15 @@ abstract class ConcurrentPackageObject {
4629
* @param execctx the execution context on which the future is run
4730
* @return the `Future` holding the result of the computation
4831
*/
49-
def future[T](body: =>T)(implicit execctx: ExecutionContext = defaultExecutionContext): Future[T] =
50-
Future[T](body)
32+
def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)
5133

5234
/** Creates a promise object which can be completed with a value.
5335
*
5436
* @tparam T the type of the value in the promise
5537
* @param execctx the execution context on which the promise is created on
5638
* @return the newly created `Promise` object
5739
*/
58-
def promise[T]()(implicit execctx: ExecutionContext = defaultExecutionContext): Promise[T] =
59-
Promise[T]()
40+
def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
6041

6142
/** Used to block on a piece of code which potentially blocks.
6243
*
@@ -67,8 +48,7 @@ abstract class ConcurrentPackageObject {
6748
* - InterruptedException - in the case that a wait within the blockable object was interrupted
6849
* - TimeoutException - in the case that the blockable object timed out
6950
*/
70-
def blocking[T](body: =>T): T =
71-
blocking(impl.Future.body2awaitable(body), Duration.Inf)
51+
def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
7252

7353
/** Blocks on an awaitable object.
7454
*
@@ -79,12 +59,8 @@ abstract class ConcurrentPackageObject {
7959
* - InterruptedException - in the case that a wait within the blockable object was interrupted
8060
* - TimeoutException - in the case that the blockable object timed out
8161
*/
82-
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
83-
currentExecutionContext.get match {
84-
case null => awaitable.result(atMost)(Await.canAwaitEvidence)
85-
case ec => ec.internalBlockingCall(awaitable, atMost)
86-
}
87-
}
62+
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
63+
BlockContext.current.internalBlockingCall(awaitable, atMost)
8864

8965
@inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x)
9066
}

src/library/scala/concurrent/DelayedLazyVal.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ package scala.concurrent
2323
* @author Paul Phillips
2424
* @version 2.8
2525
*/
26-
class DelayedLazyVal[T](f: () => T, body: => Unit) {
26+
class DelayedLazyVal[T](f: () => T, body: => Unit){
2727
@volatile private[this] var _isDone = false
2828
private[this] lazy val complete = f()
2929

@@ -39,7 +39,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
3939
*/
4040
def apply(): T = if (isDone) complete else f()
4141

42-
// TODO replace with scala.concurrent.future { ... }
42+
// FIXME need to take ExecutionContext in constructor
43+
import ExecutionContext.Implicits.global
4344
future {
4445
body
4546
_isDone = true

src/library/scala/concurrent/ExecutionContext.scala

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,58 +9,80 @@
99
package scala.concurrent
1010

1111

12-
13-
import java.util.concurrent.atomic.{ AtomicInteger }
14-
import java.util.concurrent.{ Executors, Future => JFuture, Callable, ExecutorService, Executor }
12+
import java.util.concurrent.{ ExecutorService, Executor }
1513
import scala.concurrent.util.Duration
16-
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
17-
import scala.collection.generic.CanBuildFrom
18-
import collection._
19-
20-
14+
import scala.annotation.implicitNotFound
2115

16+
/**
17+
* An `ExecutionContext` is an abstraction over an entity that can execute program logic.
18+
*/
19+
@implicitNotFound("Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global")
2220
trait ExecutionContext {
2321

2422
/** Runs a block of code on this execution context.
2523
*/
2624
def execute(runnable: Runnable): Unit
2725

28-
/** Used internally by the framework - blocks execution for at most `atMost` time while waiting
29-
* for an `awaitable` object to become ready.
30-
*
31-
* Clients should use `scala.concurrent.blocking` instead.
32-
*/
33-
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
34-
3526
/** Reports that an asynchronous computation failed.
3627
*/
3728
def reportFailure(t: Throwable): Unit
3829

3930
}
4031

32+
/**
33+
* Union interface since Java does not support union types
34+
*/
35+
trait ExecutionContextExecutor extends ExecutionContext with Executor
36+
37+
/**
38+
* Union interface since Java does not support union types
39+
*/
40+
trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService
41+
4142

4243
/** Contains factory methods for creating execution contexts.
4344
*/
4445
object ExecutionContext {
45-
46-
implicit def defaultExecutionContext: ExecutionContext = scala.concurrent.defaultExecutionContext
47-
46+
/**
47+
* The `ExecutionContext` associated with the current `Thread`
48+
*/
49+
val currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal //FIXME might want to set the initial value to an executionContext that throws an exception on execute and warns that it's not set
50+
51+
/**
52+
* This is the explicit global ExecutionContext,
53+
* call this when you want to provide the global ExecutionContext explicitly
54+
*/
55+
def global: ExecutionContextExecutor = Implicits.global
56+
57+
object Implicits {
58+
/**
59+
* This is the implicit global ExecutionContext,
60+
* import this when you want to provide the global ExecutionContext implicitly
61+
*/
62+
implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
63+
}
64+
4865
/** Creates an `ExecutionContext` from the given `ExecutorService`.
4966
*/
50-
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService =
67+
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
5168
impl.ExecutionContextImpl.fromExecutorService(e, reporter)
69+
70+
/** Creates an `ExecutionContext` from the given `ExecutorService` with the default Reporter.
71+
*/
72+
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)
5273

5374
/** Creates an `ExecutionContext` from the given `Executor`.
5475
*/
55-
def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor =
76+
def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
5677
impl.ExecutionContextImpl.fromExecutor(e, reporter)
78+
79+
/** Creates an `ExecutionContext` from the given `Executor` with the default Reporter.
80+
*/
81+
def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)
5782

58-
def defaultReporter: Throwable => Unit = {
59-
// re-throwing `Error`s here causes an exception handling test to fail.
60-
//case e: Error => throw e
61-
case t => t.printStackTrace()
62-
}
63-
83+
/** The default reporter simply prints the stack trace of the `Throwable` to System.err.
84+
*/
85+
def defaultReporter: Throwable => Unit = { case t => t.printStackTrace() }
6486
}
6587

6688

src/library/scala/concurrent/Future.scala

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
package scala.concurrent
1010

11-
11+
import language.higherKinds
1212

1313
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
1414
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS MILLIS }
@@ -23,11 +23,9 @@ import scala.Option
2323
import scala.util.{Try, Success, Failure}
2424

2525
import scala.annotation.tailrec
26-
import scala.collection.mutable.Stack
2726
import scala.collection.mutable.Builder
2827
import scala.collection.generic.CanBuildFrom
2928
import scala.reflect.ClassTag
30-
import language.higherKinds
3129

3230

3331

@@ -138,7 +136,7 @@ trait Future[+T] extends Awaitable[T] {
138136
* $callbackInContext
139137
*/
140138
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
141-
case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
139+
case Left(t) if (impl.Future.isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
142140
case _ =>
143141
}(executor)
144142

@@ -580,6 +578,20 @@ object Future {
580578
classOf[Double] -> classOf[jl.Double],
581579
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
582580
)
581+
582+
/** Creates an already completed Future with the specified exception.
583+
*
584+
* @tparam T the type of the value in the future
585+
* @return the newly created `Future` object
586+
*/
587+
def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
588+
589+
/** Creates an already completed Future with the specified result.
590+
*
591+
* @tparam T the type of the value in the future
592+
* @return the newly created `Future` object
593+
*/
594+
def successful[T](result: T): Future[T] = Promise.successful(result).future
583595

584596
/** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
585597
*
@@ -710,5 +722,12 @@ object Future {
710722
}
711723
}
712724

713-
725+
/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext`
726+
* wraps a callback provided to `Future.onComplete`.
727+
* All callbacks provided to a `Future` end up going through `onComplete`, so this allows an
728+
* `ExecutionContext` to special-case callbacks that were executed by `Future` if desired.
729+
*/
730+
trait OnCompleteRunnable {
731+
self: Runnable =>
732+
}
714733

src/library/scala/concurrent/Promise.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ trait Promise[T] {
3434
*/
3535
def future: Future[T]
3636

37+
/** Returns whether the promise has already been completed with
38+
* a value or an exception.
39+
*
40+
* $nonDeterministic
41+
*
42+
* @return `true` if the promise is already completed, `false` otherwise
43+
*/
44+
def isCompleted: Boolean
45+
3746
/** Completes the promise with either an exception or a value.
3847
*
3948
* @param result Either the value or the exception to complete the promise with.

0 commit comments

Comments
 (0)