Skip to content

Commit 1d413a4

Browse files
authored
Handle subscription init timeout gracefully (#1915)
### 📝 Description When using Spring Server client tries to initiate subscription, but timeout occurs before sending `connection_init` message Kotlin coroutine fails with exception: ``` 2024-01-22T11:14:05.757+02:00 INFO 20308 --- [ main] c.p.ApplicationKt : Started ApplicationKt in 2.86 seconds (process running for 3.29) Exception in thread "DefaultDispatcher-worker-2" java.util.NoSuchElementException: No value received via onNext for awaitFirst at kotlinx.coroutines.reactive.AwaitKt$awaitOne$2$1.onComplete(Await.kt:282) at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123) at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:726) at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:281) at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:361) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:421) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:355) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:925) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:941) at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247) at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:1583) Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@2f9aebd2, Dispatchers.Default] ``` This small change fixes the problem, I've also added a test to cover this specific case.
1 parent e6ab991 commit 1d413a4

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCO
2121
import com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer
2222
import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus
2323
import com.fasterxml.jackson.databind.ObjectMapper
24-
import kotlinx.coroutines.reactive.awaitFirst
24+
import kotlinx.coroutines.reactive.awaitFirstOrNull
2525
import kotlinx.coroutines.reactor.flux
2626
import org.springframework.web.reactive.socket.CloseStatus
2727
import org.springframework.web.reactive.socket.WebSocketHandler
@@ -53,7 +53,7 @@ class SubscriptionWebSocketHandler(
5353
)
5454

5555
override suspend fun closeSession(session: WebSocketSession, reason: GraphQLSubscriptionStatus) {
56-
session.close(CloseStatus(reason.code, reason.reason)).awaitFirst()
56+
session.close(CloseStatus(reason.code, reason.reason)).awaitFirstOrNull()
5757
}
5858

5959
override suspend fun sendSubscriptionMessage(session: WebSocketSession, message: String): WebSocketMessage =

servers/graphql-kotlin-spring-server/src/test/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandlerTest.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@
1717
package com.expediagroup.graphql.server.spring.subscriptions
1818

1919
import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCOL
20+
import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus
2021
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
22+
import io.mockk.every
2123
import io.mockk.mockk
2224
import org.junit.jupiter.api.Test
2325
import kotlin.test.assertEquals
26+
import kotlinx.coroutines.test.runTest
27+
import org.junit.jupiter.api.assertDoesNotThrow
28+
import org.springframework.web.reactive.socket.WebSocketSession
29+
import reactor.core.publisher.Mono
2430

2531
class SubscriptionWebSocketHandlerTest {
2632

@@ -35,4 +41,16 @@ class SubscriptionWebSocketHandlerTest {
3541
val handler = SubscriptionWebSocketHandler(mockk(), mockk(), mockk(), mockk(), 1_000, jacksonObjectMapper())
3642
assertEquals(expected = listOf(GRAPHQL_WS_PROTOCOL), actual = handler.subProtocols)
3743
}
44+
45+
@Test
46+
fun `verify default subscription handler handles init timeout gracefully`() = runTest {
47+
val handler = SubscriptionWebSocketHandler(mockk(), mockk(), mockk(), mockk(), 1_000, jacksonObjectMapper())
48+
val session = mockk<WebSocketSession>()
49+
50+
every { session.close(any()) } returns Mono.empty()
51+
52+
assertDoesNotThrow {
53+
handler.closeSession(session, GraphQLSubscriptionStatus.CONNECTION_INIT_TIMEOUT)
54+
}
55+
}
3856
}

0 commit comments

Comments
 (0)