Skip to content

Commit 00a0767

Browse files
Explain SafeCollector hackery (#3811)
Co-authored-by: Dmitry Khalanskiy <[email protected]>
1 parent 9a98eab commit 00a0767

File tree

2 files changed

+38
-6
lines changed

2 files changed

+38
-6
lines changed

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import kotlinx.coroutines.internal.ScopeCoroutine
1010
import kotlin.coroutines.*
1111
import kotlin.jvm.*
1212

13+
// Collector that ensures exception transparency and context preservation on a best-effort basis.
14+
// See an explanation in SafeCollector JVM actualization.
1315
internal expect class SafeCollector<T>(
1416
collector: FlowCollector<T>,
1517
collectContext: CoroutineContext

kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,25 @@ import kotlin.coroutines.jvm.internal.*
1313
@Suppress("UNCHECKED_CAST")
1414
private val emitFun =
1515
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
16-
/*
17-
* Implementor of ContinuationImpl (that will be preserved as ABI nearly forever)
18-
* in order to properly control 'intercepted()' lifecycle.
16+
17+
/**
18+
* A safe collector is an instance of [FlowCollector] that ensures that neither context preservation
19+
* nor exception transparency invariants are broken. Instances of [SafeCollector] are used in flow
20+
* operators that provide raw access to the [FlowCollector] e.g. [Flow.transform].
21+
* Mechanically, each [emit] call captures [currentCoroutineContext], ensures it is not different from the
22+
* previously caught one and proceeds further. If an exception is thrown from the downstream,
23+
* it is caught, and any further attempts to [emit] lead to the [IllegalStateException].
24+
*
25+
* ### Performance hacks
26+
*
27+
* Implementor of [ContinuationImpl] (that will be preserved as ABI nearly forever)
28+
* in order to properly control `intercepted()` lifecycle.
29+
* The safe collector implements [ContinuationImpl] to pretend it *is* a state-machine of its own `emit` method.
30+
* It is [ContinuationImpl] and not any other [Continuation] subclass because only [ContinuationImpl] supports `intercepted()` caching.
31+
* This is the most performance-sensitive place in the overall flow pipeline, because otherwise safe collector is forced to allocate
32+
* a state machine on each element being emitted for each intermediate stage where the safe collector is present.
33+
*
34+
* See a comment to [emit] for the explanation of what and how is being optimized.
1935
*/
2036
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
2137
internal actual class SafeCollector<T> actual constructor(
@@ -56,11 +72,15 @@ internal actual class SafeCollector<T> actual constructor(
5672

5773
/**
5874
* This is a crafty implementation of state-machine reusing.
59-
* First it checks that it is not used concurrently (which we explicitly prohibit) and
60-
* then just cache an instance of the completion_ in order to avoid extra allocation on each emit,
75+
*
76+
* First it checks that it is not used concurrently (which we explicitly prohibit), and
77+
* then just caches an instance of the completion_ in order to avoid extra allocation on each emit,
6178
* making it effectively garbage-free on its hot-path.
79+
*
80+
* See `emit` overload.
6281
*/
6382
actual override suspend fun emit(value: T) {
83+
// NB: it is a tail-call, so we are sure `uCont` is the completion of the emit's **caller**.
6484
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
6585
try {
6686
emit(uCont, value)
@@ -74,10 +94,20 @@ internal actual class SafeCollector<T> actual constructor(
7494
}
7595
}
7696

97+
/**
98+
* Here we use the following trick:
99+
* - Perform all the required checks
100+
* - Having a non-intercepted, non-cancellable caller's `uCont`, we leverage our implementation knowledge
101+
* and invoke `collector.emit(T)` as `collector.emit(value: T, completion: Continuation), passing `this`
102+
* as the completion. We also setup `this` state, so if the `completion.resume` is invoked, we are
103+
* invoking `uCont.resume` properly in accordance with `ContinuationImpl`/`BaseContinuationImpl` internal invariants.
104+
*
105+
* Note that in such scenarios, `collector.emit` completion is the current instance of SafeCollector and thus is reused.
106+
*/
77107
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
78108
val currentContext = uCont.context
79109
currentContext.ensureActive()
80-
// This check is triggered once per flow on happy path.
110+
// This check is triggered once per flow on a happy path.
81111
val previousContext = lastEmissionContext
82112
if (previousContext !== currentContext) {
83113
checkContext(currentContext, previousContext, value)

0 commit comments

Comments
 (0)