Skip to content

Commit 0a8fed9

Browse files
committed
AMMEND
1 parent eb4e399 commit 0a8fed9

File tree

3 files changed

+149
-261
lines changed

3 files changed

+149
-261
lines changed

kotlinx-coroutines-core/js/src/JSDispatcher.kt

Lines changed: 11 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,18 @@
44

55
package kotlinx.coroutines
66

7-
import kotlinx.coroutines.internal.*
87
import org.w3c.dom.*
9-
import kotlin.coroutines.*
108
import kotlin.js.Promise
119

12-
private const val MAX_DELAY = Int.MAX_VALUE.toLong()
10+
internal class ScheduledMessageQueue(private val dispatcher: SetTimeoutBasedDispatcher) : MessageQueue() {
11+
val processQueue: dynamic = { process() }
1312

14-
private fun delayToInt(timeMillis: Long): Int =
15-
timeMillis.coerceIn(0, MAX_DELAY).toInt()
16-
17-
internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
18-
inner class ScheduledMessageQueue : MessageQueue() {
19-
internal val processQueue: dynamic = { process() }
20-
21-
override fun schedule() {
22-
scheduleQueueProcessing()
23-
}
24-
25-
override fun reschedule() {
26-
setTimeout(processQueue, 0)
27-
}
28-
}
29-
30-
internal val messageQueue = ScheduledMessageQueue()
31-
32-
abstract fun scheduleQueueProcessing()
33-
34-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
35-
parallelism.checkParallelism()
36-
return this
37-
}
38-
39-
override fun dispatch(context: CoroutineContext, block: Runnable) {
40-
messageQueue.enqueue(block)
41-
}
42-
43-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
44-
val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
45-
return ClearTimeout(handle)
13+
override fun schedule() {
14+
dispatcher.scheduleQueueProcessing()
4615
}
4716

48-
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
49-
val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
50-
continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
17+
override fun reschedule() {
18+
setTimeout(processQueue, 0)
5119
}
5220
}
5321

@@ -57,48 +25,7 @@ internal object NodeDispatcher : SetTimeoutBasedDispatcher() {
5725
}
5826
}
5927

60-
internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
61-
override fun scheduleQueueProcessing() {
62-
setTimeout(messageQueue.processQueue, 0)
63-
}
64-
}
65-
66-
private open class ClearTimeout(protected val handle: Int) : CancelHandler(), DisposableHandle {
67-
68-
override fun dispose() {
69-
clearTimeout(handle)
70-
}
71-
72-
override fun invoke(cause: Throwable?) {
73-
dispose()
74-
}
75-
76-
override fun toString(): String = "ClearTimeout[$handle]"
77-
}
78-
79-
internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
80-
private val queue = WindowMessageQueue(window)
81-
82-
override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)
83-
84-
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
85-
val handle = window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
86-
continuation.invokeOnCancellation(handler = WindowClearTimeout(handle).asHandler)
87-
}
88-
89-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
90-
val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
91-
return WindowClearTimeout(handle)
92-
}
93-
94-
private inner class WindowClearTimeout(handle: Int) : ClearTimeout(handle) {
95-
override fun dispose() {
96-
window.clearTimeout(handle)
97-
}
98-
}
99-
}
100-
101-
private class WindowMessageQueue(private val window: Window) : MessageQueue() {
28+
internal class WindowMessageQueue(private val window: Window) : MessageQueue() {
10229
private val messageName = "dispatchCoroutine"
10330

10431
init {
@@ -119,52 +46,9 @@ private class WindowMessageQueue(private val window: Window) : MessageQueue() {
11946
}
12047
}
12148

122-
/**
123-
* An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
124-
* paying the cost of JS callbacks scheduling on every dispatch.
125-
*
126-
* Queue uses two scheduling mechanisms:
127-
* 1) [schedule] is used to schedule the initial processing of the message queue.
128-
* JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
129-
* 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
130-
* JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
131-
*
132-
* Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
133-
*/
134-
internal abstract class MessageQueue : MutableList<Runnable> by ArrayDeque() {
135-
val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
136-
private var scheduled = false
137-
138-
abstract fun schedule()
139-
140-
abstract fun reschedule()
141-
142-
fun enqueue(element: Runnable) {
143-
add(element)
144-
if (!scheduled) {
145-
scheduled = true
146-
schedule()
147-
}
148-
}
149-
150-
fun process() {
151-
try {
152-
// limit number of processed messages
153-
repeat(yieldEvery) {
154-
val element = removeFirstOrNull() ?: return@process
155-
element.run()
156-
}
157-
} finally {
158-
if (isEmpty()) {
159-
scheduled = false
160-
} else {
161-
reschedule()
162-
}
163-
}
164-
}
165-
}
166-
16749
// We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
16850
// using them via "window" (which only works in browser)
169-
private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
170-
private external fun clearTimeout(handle: Int = definedExternally)
51+
internal external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
52+
internal external fun clearTimeout(handle: Int = definedExternally)
53+
internal fun setTimeout(window: WindowOrWorkerGlobalScope, handler: () -> Unit, timeout: Int): Int =
54+
window.setTimeout(handler, timeout)
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.internal.*
8+
import org.w3c.dom.*
9+
import kotlin.coroutines.*
10+
11+
private const val MAX_DELAY = Int.MAX_VALUE.toLong()
12+
13+
private fun delayToInt(timeMillis: Long): Int =
14+
timeMillis.coerceIn(0, MAX_DELAY).toInt()
15+
16+
internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
17+
val messageQueue = ScheduledMessageQueue(this)
18+
19+
abstract fun scheduleQueueProcessing()
20+
21+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
22+
parallelism.checkParallelism()
23+
return this
24+
}
25+
26+
override fun dispatch(context: CoroutineContext, block: Runnable) {
27+
messageQueue.enqueue(block)
28+
}
29+
30+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
31+
val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
32+
return ClearTimeout(handle)
33+
}
34+
35+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
36+
val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
37+
continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
38+
}
39+
}
40+
41+
internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
42+
private val queue = WindowMessageQueue(window)
43+
44+
override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)
45+
46+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
47+
val handle = setTimeout(window, { with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
48+
continuation.invokeOnCancellation(handler = WindowClearTimeout(handle).asHandler)
49+
}
50+
51+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
52+
val handle = setTimeout(window, block::run, delayToInt(timeMillis))
53+
return WindowClearTimeout(handle)
54+
}
55+
56+
private inner class WindowClearTimeout(handle: Int) : ClearTimeout(handle) {
57+
override fun dispose() {
58+
window.clearTimeout(handle)
59+
}
60+
}
61+
}
62+
63+
internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
64+
override fun scheduleQueueProcessing() {
65+
setTimeout(messageQueue.processQueue, 0)
66+
}
67+
}
68+
69+
private open class ClearTimeout(protected val handle: Int) : CancelHandler(), DisposableHandle {
70+
71+
override fun dispose() {
72+
clearTimeout(handle)
73+
}
74+
75+
override fun invoke(cause: Throwable?) {
76+
dispose()
77+
}
78+
79+
override fun toString(): String = "ClearTimeout[$handle]"
80+
}
81+
82+
83+
/**
84+
* An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
85+
* paying the cost of JS callbacks scheduling on every dispatch.
86+
*
87+
* Queue uses two scheduling mechanisms:
88+
* 1) [schedule] is used to schedule the initial processing of the message queue.
89+
* JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
90+
* 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
91+
* JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
92+
*
93+
* Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
94+
*/
95+
internal abstract class MessageQueue : MutableList<Runnable> by ArrayDeque() {
96+
val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
97+
private var scheduled = false
98+
99+
abstract fun schedule()
100+
101+
abstract fun reschedule()
102+
103+
fun enqueue(element: Runnable) {
104+
add(element)
105+
if (!scheduled) {
106+
scheduled = true
107+
schedule()
108+
}
109+
}
110+
111+
fun process() {
112+
try {
113+
// limit number of processed messages
114+
repeat(yieldEvery) {
115+
val element = removeFirstOrNull() ?: return@process
116+
element.run()
117+
}
118+
} finally {
119+
if (isEmpty()) {
120+
scheduled = false
121+
} else {
122+
reschedule()
123+
}
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)