Skip to content

Commit bfbd1a9

Browse files
authored
Fix Apollo subscription message handling (#371)
* Fix Apollo subscription message handling * Implement graphql-ws spec Here is the spec https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md * update spring config metadata * Remove unused config The config value for the packages is overriden in the hooks so we can remove it
1 parent 4a70469 commit bfbd1a9

File tree

12 files changed

+586
-26
lines changed

12 files changed

+586
-26
lines changed

detekt.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ complexity:
2929
ComplexInterface:
3030
threshold: 15
3131

32+
naming:
33+
FunctionMaxLength:
34+
maximumFunctionNameLength: 35
35+
3236
style:
3337
MagicNumber:
3438
ignoreEnums: true

examples/spring/src/main/kotlin/com/expediagroup/graphql/sample/subscriptions/SimpleSubscription.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ package com.expediagroup.graphql.sample.subscriptions
1818

1919
import com.expediagroup.graphql.annotations.GraphQLDescription
2020
import com.expediagroup.graphql.spring.operations.Subscription
21+
import kotlinx.coroutines.flow.flowOf
22+
import kotlinx.coroutines.reactive.asPublisher
23+
import org.reactivestreams.Publisher
2124
import org.springframework.stereotype.Component
2225
import reactor.core.publisher.Flux
2326
import reactor.core.publisher.Mono
@@ -32,4 +35,17 @@ class SimpleSubscription : Subscription {
3235

3336
@GraphQLDescription("Returns a random number every second")
3437
fun counter(): Flux<Int> = Flux.interval(Duration.ofSeconds(1)).map { Random.nextInt() }
38+
39+
@GraphQLDescription("Returns a random number every second, errors if even")
40+
fun counterWithError(): Flux<Int> = Flux.interval(Duration.ofSeconds(1))
41+
.map {
42+
val value = Random.nextInt()
43+
if (value % 2 == 0) {
44+
throw Exception("Value is even $value")
45+
}
46+
else value
47+
}
48+
49+
@GraphQLDescription("Returns list of values")
50+
fun flow(): Publisher<Int> = flowOf(1, 2, 4).asPublisher()
3551
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
graphql:
2+
subscriptions:
3+
# Send a ka message every 1000 ms (1 second)
4+
keepAliveInterval: 1000

graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/GraphQLConfigurationProperties.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,6 @@ class FederationConfigurationProperties {
4545
class SubscriptionConfigurationProperties {
4646
/** GraphQL subscriptions endpoint, defaults to 'subscriptions' */
4747
var endpoint: String = "subscriptions"
48+
/** Keep the websocket alive and send a message to the client every interval in ms. Default to not sending messages */
49+
var keepAliveInterval: Long? = null
4850
}

graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/SubscriptionAutoConfiguration.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.expediagroup.graphql.spring
1818

19+
import com.expediagroup.graphql.spring.execution.ApolloSubscriptionProtocolHandler
1920
import com.expediagroup.graphql.spring.execution.SimpleSubscriptionHandler
2021
import com.expediagroup.graphql.spring.execution.SubscriptionHandler
2122
import com.expediagroup.graphql.spring.execution.SubscriptionWebSocketHandler
@@ -47,7 +48,12 @@ class SubscriptionAutoConfiguration {
4748
fun websocketHandlerAdapter(): WebSocketHandlerAdapter = WebSocketHandlerAdapter()
4849

4950
@Bean
50-
fun subscriptionWebSocketHandler(handler: SubscriptionHandler, objectMapper: ObjectMapper) = SubscriptionWebSocketHandler(handler, objectMapper)
51+
fun apolloSubscriptionProtocolHandler(config: GraphQLConfigurationProperties, handler: SubscriptionHandler, objectMapper: ObjectMapper) =
52+
ApolloSubscriptionProtocolHandler(config, handler, objectMapper)
53+
54+
@Bean
55+
fun subscriptionWebSocketHandler(handler: ApolloSubscriptionProtocolHandler, objectMapper: ObjectMapper) =
56+
SubscriptionWebSocketHandler(handler, objectMapper)
5157

5258
@Bean
5359
fun subscriptionHandlerMapping(config: GraphQLConfigurationProperties, subscriptionWebSocketHandler: SubscriptionWebSocketHandler): HandlerMapping =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2019 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.spring.exception
18+
19+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage
20+
21+
class UknownSubscriptionOperationType(operationMessage: SubscriptionOperationMessage) :
22+
IllegalArgumentException("Unknown subscription operation $operationMessage")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2019 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.spring.execution
18+
19+
import com.expediagroup.graphql.spring.GraphQLConfigurationProperties
20+
import com.expediagroup.graphql.spring.model.GraphQLRequest
21+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage
22+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_INIT
23+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_TERMINATE
24+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_START
25+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_STOP
26+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_COMPLETE
27+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_CONNECTION_ACK
28+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_CONNECTION_ERROR
29+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_CONNECTION_KEEP_ALIVE
30+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_DATA
31+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_ERROR
32+
import com.fasterxml.jackson.databind.ObjectMapper
33+
import com.fasterxml.jackson.module.kotlin.convertValue
34+
import com.fasterxml.jackson.module.kotlin.readValue
35+
import org.reactivestreams.Subscription
36+
import org.slf4j.LoggerFactory
37+
import org.springframework.web.reactive.socket.WebSocketSession
38+
import reactor.core.publisher.Flux
39+
import java.time.Duration
40+
import java.util.concurrent.ConcurrentHashMap
41+
42+
class ApolloSubscriptionProtocolHandler(
43+
private val config: GraphQLConfigurationProperties,
44+
private val subscriptionHandler: SubscriptionHandler,
45+
private val objectMapper: ObjectMapper
46+
) {
47+
// Keep Alive subscriptions are saved by web socket session id since they are sent on connection init
48+
private val keepAliveSubscriptions = ConcurrentHashMap<String, Subscription>()
49+
// Data subscriptions are saved by SubscriptionOperationMessage.id
50+
private val subscriptions = ConcurrentHashMap<String, Subscription>()
51+
52+
private val logger = LoggerFactory.getLogger(ApolloSubscriptionProtocolHandler::class.java)
53+
private val keepAliveMessage = SubscriptionOperationMessage(type = GQL_CONNECTION_KEEP_ALIVE.type)
54+
55+
@Suppress("Detekt.TooGenericExceptionCaught")
56+
fun handle(payload: String, session: WebSocketSession): Flux<SubscriptionOperationMessage> {
57+
try {
58+
val operationMessage: SubscriptionOperationMessage = objectMapper.readValue(payload)
59+
60+
return when {
61+
operationMessage.type == GQL_CONNECTION_INIT.type -> {
62+
val flux = Flux.just(SubscriptionOperationMessage(GQL_CONNECTION_ACK.type))
63+
val keepAliveInterval = config.subscriptions.keepAliveInterval
64+
if (keepAliveInterval != null) {
65+
// Send the GQL_CONNECTION_KEEP_ALIVE message every interval until the connection is closed or terminated
66+
val keepAliveFlux = Flux.interval(Duration.ofMillis(keepAliveInterval))
67+
.map { keepAliveMessage }
68+
.doOnSubscribe {
69+
keepAliveSubscriptions[session.id] = it
70+
}
71+
return flux.concatWith(keepAliveFlux)
72+
}
73+
74+
return flux
75+
}
76+
operationMessage.type == GQL_START.type -> startSubscription(operationMessage, session)
77+
operationMessage.type == GQL_STOP.type -> {
78+
stopSubscription(operationMessage, session)
79+
Flux.empty()
80+
}
81+
operationMessage.type == GQL_CONNECTION_TERMINATE.type -> {
82+
stopSubscription(operationMessage, session)
83+
session.close()
84+
Flux.empty()
85+
}
86+
else -> {
87+
logger.error("Unknown subscription operation $operationMessage")
88+
Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
89+
}
90+
}
91+
} catch (exception: Exception) {
92+
logger.error("Error parsing the subscription message", exception)
93+
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type))
94+
}
95+
}
96+
97+
@Suppress("Detekt.TooGenericExceptionCaught")
98+
private fun startSubscription(operationMessage: SubscriptionOperationMessage, session: WebSocketSession): Flux<SubscriptionOperationMessage> {
99+
if (operationMessage.id == null) {
100+
logger.error("Operation id is required")
101+
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type))
102+
}
103+
104+
val payload = operationMessage.payload
105+
106+
if (payload == null) {
107+
logger.error("Payload was null instead of a GraphQLRequest object")
108+
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
109+
}
110+
111+
return try {
112+
val request = objectMapper.convertValue<GraphQLRequest>(payload)
113+
subscriptionHandler.executeSubscription(request)
114+
.map {
115+
if (it.errors?.isNotEmpty() == true) {
116+
SubscriptionOperationMessage(type = GQL_ERROR.type, id = operationMessage.id, payload = it)
117+
} else {
118+
SubscriptionOperationMessage(type = GQL_DATA.type, id = operationMessage.id, payload = it)
119+
}
120+
}
121+
.concatWith(Flux.just(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = operationMessage.id)))
122+
.doOnSubscribe {
123+
logger.trace("WebSocket GraphQL subscription subscribe, WebSocketSessionID=${session.id} OperationMessageID=${operationMessage.id}")
124+
subscriptions[operationMessage.id] = it
125+
}
126+
.doOnCancel { logger.trace("WebSocket GraphQL subscription cancel, WebSocketSessionID=${session.id} OperationMessageID=${operationMessage.id}") }
127+
.doOnComplete { logger.trace("WebSocket GraphQL subscription complete, WebSocketSessionID=${session.id} OperationMessageID=${operationMessage.id}") }
128+
} catch (exception: Exception) {
129+
logger.error("Error running graphql subscription", exception)
130+
Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
131+
}
132+
}
133+
134+
private fun stopSubscription(operationMessage: SubscriptionOperationMessage, session: WebSocketSession) {
135+
if (operationMessage.id != null) {
136+
keepAliveSubscriptions[session.id]?.cancel()
137+
subscriptions[operationMessage.id]?.cancel()
138+
}
139+
}
140+
}

graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/execution/SubscriptionWebSocketHandler.kt

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616

1717
package com.expediagroup.graphql.spring.execution
1818

19-
import com.expediagroup.graphql.spring.model.GraphQLRequest
2019
import com.fasterxml.jackson.databind.ObjectMapper
21-
import com.fasterxml.jackson.module.kotlin.readValue
22-
import org.slf4j.LoggerFactory
2320
import org.springframework.web.reactive.socket.WebSocketHandler
2421
import org.springframework.web.reactive.socket.WebSocketSession
2522
import reactor.core.publisher.Mono
@@ -28,23 +25,14 @@ import reactor.core.publisher.Mono
2825
* Default WebSocket handler for handling GraphQL subscriptions.
2926
*/
3027
class SubscriptionWebSocketHandler(
31-
private val subscriptionHandler: SubscriptionHandler,
28+
private val subscriptionHandler: ApolloSubscriptionProtocolHandler,
3229
private val objectMapper: ObjectMapper
3330
) : WebSocketHandler {
3431

35-
private val logger = LoggerFactory.getLogger(SubscriptionWebSocketHandler::class.java)
36-
3732
@Suppress("ForbiddenVoid")
3833
override fun handle(session: WebSocketSession): Mono<Void> {
3934
val response = session.receive()
40-
.concatMap {
41-
val graphQLRequest = objectMapper.readValue<GraphQLRequest>(it.payloadAsText)
42-
subscriptionHandler.executeSubscription(graphQLRequest)
43-
.doOnSubscribe { logger.trace("WebSocket GraphQL subscription subscribe, ID=${session.id}") }
44-
.doOnCancel { logger.trace("WebSocket GraphQL subscription cancel, ID=${session.id}") }
45-
.doOnComplete { logger.trace("WebSocket GraphQL subscription complete, ID=${session.id}") }
46-
.doFinally { session.close() }
47-
}
35+
.flatMap { subscriptionHandler.handle(it.payloadAsText, session) }
4836
.map { objectMapper.writeValueAsString(it) }
4937
.map { session.textMessage(it) }
5038

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2019 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.spring.model
18+
19+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
20+
21+
/**
22+
* The `graphql-ws` protocol from Apollo Client has some special text messages to signal events.
23+
* Along with the HTTP WebSocket event handling we need to have some extra logic
24+
*
25+
* https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
26+
*/
27+
@JsonIgnoreProperties(ignoreUnknown = true)
28+
data class SubscriptionOperationMessage(
29+
val type: String,
30+
val id: String? = null,
31+
val payload: Any? = null
32+
) {
33+
enum class ClientMessages(val type: String) {
34+
GQL_CONNECTION_INIT("connection_init"),
35+
GQL_START("start"),
36+
GQL_STOP("stop"),
37+
GQL_CONNECTION_TERMINATE("connection_terminate")
38+
}
39+
40+
enum class ServerMessages(val type: String) {
41+
GQL_CONNECTION_ACK("connection_ack"),
42+
GQL_CONNECTION_ERROR("connection_error"),
43+
GQL_DATA("data"),
44+
GQL_ERROR("error"),
45+
GQL_COMPLETE("complete"),
46+
GQL_CONNECTION_KEEP_ALIVE("ka")
47+
}
48+
}

graphql-kotlin-spring-server/src/main/resources/META-INF/spring-configuration-metadata.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121
"type": "java.lang.String",
2222
"defaultValue": "subscriptions",
2323
"sourceType": "com.expediagroup.graphql.spring.SubscriptionConfigurationProperties"
24+
},
25+
{
26+
"name": "graphql.subscriptions.keepAliveInterval",
27+
"type": "java.lang.Long",
28+
"defaultValue": "null",
29+
"sourceType": "com.expediagroup.graphql.spring.SubscriptionConfigurationProperties"
2430
}
2531
]
2632
}

0 commit comments

Comments
 (0)