Skip to content

Handle subscription init timeout gracefully #1915

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

pdambrauskas
Copy link
Contributor

📝 Description

When using Spring Server client tries to initiate subscription, but timeout occurs before sending connection_init message Kotlin coroutine fails with exception:

2024-01-22T11:14:05.757+02:00  INFO 20308 --- [           main] c.p.ApplicationKt   : Started ApplicationKt in 2.86 seconds (process running for 3.29)
Exception in thread "DefaultDispatcher-worker-2" java.util.NoSuchElementException: No value received via onNext for awaitFirst
	at kotlinx.coroutines.reactive.AwaitKt$awaitOne$2$1.onComplete(Await.kt:282)
	at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
	at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
	at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
	at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:726)
	at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:281)
	at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:361)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:421)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:355)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893)
	at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:925)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:941)
	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@2f9aebd2, Dispatchers.Default]

This small change fixes the problem, I've also added a test to cover this specific case.

@pdambrauskas pdambrauskas force-pushed the do_not_fail_on_init_timeout branch from 85351d3 to 7c26eb2 Compare January 22, 2024 17:40
@samuelAndalon samuelAndalon self-requested a review January 25, 2024 02:12
@dariuszkuc dariuszkuc merged commit 1d413a4 into ExpediaGroup:master Jan 25, 2024
@pdambrauskas pdambrauskas deleted the do_not_fail_on_init_timeout branch January 25, 2024 06:35
@dariuszkuc dariuszkuc added the changes: patch Changes require a patch version label Feb 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
changes: patch Changes require a patch version
Development

Successfully merging this pull request may close these issues.

3 participants