Skip to content

Commit f1e21cb

Browse files
de55smyrick
authored andcommitted
Add Apollo Subscription Hooks (#544)
* Add Apollo Subscription Lifecycle Events (WIP): * switch lifecycle events to monos * adjust tests * update tests and add state variable for onConnect * add MySubscriptionHooks bean * fix lint issues * Update SubscriptionAutoConfiguration.kt Change apolloLifeCycleEvents to apolloSubscriptionHooks * add documentation and tests * reorder test * get rid of extra assertion in test * alter tests to reflect that flux won't play twice * fix lint * fix lint * default to using empty params in onConnect rather than empty mono * switch some tests to using StepVerifier
1 parent bf958d2 commit f1e21cb

File tree

11 files changed

+415
-82
lines changed

11 files changed

+415
-82
lines changed

docs/spring-server/subscriptions.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,27 @@ title: Subscriptions
77
To see more details of how to implement subscriptions in your schema, see [executing subscriptions](../execution/subscriptions).
88

99
## `graphql-ws` subprotocol
10+
###Overview
1011
We have implemented subscriptions in Spring WebSockets following the [`graphql-ws` subprotocol](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md) defined by Apollo. This requires that your client send and parse messages in a specific format.
1112

1213
You can see more details in the file [ApolloSubscriptionProtocolHandler](https://github.com/ExpediaGroup/graphql-kotlin/blob/master/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/execution/ApolloSubscriptionProtocolHandler.kt).
1314

1415
If you would like to implement your own subscription handler, you can provide a primary spring bean for `HandlerMapping` that overrides the [default one](https://github.com/ExpediaGroup/graphql-kotlin/blob/master/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/SubscriptionAutoConfiguration.kt) which sets the url for subscriptions to the Apollo subscription handler.
1516

17+
### Subscription Hooks
18+
In line with the protocol, we have implemented hooks to execute functions at different stages of the connection lifecycle:
19+
- onConnect
20+
- onOperation
21+
- onOperationComplete
22+
- onDisconnect
23+
24+
You can see more details in the file [ApolloSubscriptionHooks](https://github.com/ExpediaGroup/graphql-kotlin/blob/master/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/execution/ApolloSubscriptionHooks.kt).
25+
26+
If you would like to implement your own subscription hooks, you can provide a primary spring bean for `ApolloSubscriptionHooks` that overrides the [default one](https://github.com/ExpediaGroup/graphql-kotlin/blob/master/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/SubscriptionAutoConfiguration.kt) which do not perform any actions.
27+
28+
1629
## Example
1730
You can see an example implementation of a `Subscription` in the [example app](https://github.com/ExpediaGroup/graphql-kotlin/blob/master/examples/spring/src/main/kotlin/com/expediagroup/graphql/sample/subscriptions/SimpleSubscription.kt).
1831

1932

2033

21-

examples/spring/src/main/kotlin/com/expediagroup/graphql/examples/Application.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import com.expediagroup.graphql.examples.datafetchers.CustomDataFetcherFactoryPr
2121
import com.expediagroup.graphql.examples.datafetchers.SpringDataFetcherFactory
2222
import com.expediagroup.graphql.examples.directives.CustomDirectiveWiringFactory
2323
import com.expediagroup.graphql.examples.exceptions.CustomDataFetcherExceptionHandler
24+
import com.expediagroup.graphql.examples.execution.MySubscriptionHooks
2425
import com.expediagroup.graphql.examples.extension.CustomSchemaGeneratorHooks
26+
import com.expediagroup.graphql.spring.execution.ApolloSubscriptionHooks
2527
import com.fasterxml.jackson.databind.ObjectMapper
2628
import graphql.execution.DataFetcherExceptionHandler
2729
import org.springframework.boot.autoconfigure.SpringBootApplication
@@ -43,6 +45,9 @@ class Application {
4345

4446
@Bean
4547
fun dataFetcherExceptionHandler(): DataFetcherExceptionHandler = CustomDataFetcherExceptionHandler()
48+
49+
@Bean
50+
fun apolloSubscriptionHooks(): ApolloSubscriptionHooks = MySubscriptionHooks()
4651
}
4752

4853
fun main(args: Array<String>) {

examples/spring/src/main/kotlin/com/expediagroup/graphql/examples/context/MyGraphQLContext.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ import org.springframework.http.server.reactive.ServerHttpResponse
2323
/**
2424
* Simple [GraphQLContext] that holds extra value.
2525
*/
26-
class MyGraphQLContext(val myCustomValue: String, val request: ServerHttpRequest, val response: ServerHttpResponse)
26+
class MyGraphQLContext(val myCustomValue: String, val request: ServerHttpRequest, val response: ServerHttpResponse, var subscriptionValue: String? = null)

examples/spring/src/main/kotlin/com/expediagroup/graphql/examples/context/MyGraphQLContextFactory.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import org.springframework.stereotype.Component
1212
class MyGraphQLContextFactory : GraphQLContextFactory<MyGraphQLContext> {
1313

1414
override suspend fun generateContext(request: ServerHttpRequest, response: ServerHttpResponse): MyGraphQLContext = MyGraphQLContext(
15-
myCustomValue = request.headers.getFirst("MyHeader") ?: "defaultContext",
16-
request = request,
17-
response = response)
15+
myCustomValue = request.headers.getFirst("MyHeader") ?: "defaultContext",
16+
request = request,
17+
response = response,
18+
subscriptionValue = null)
1819
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.expediagroup.graphql.examples.execution
2+
3+
import com.expediagroup.graphql.examples.context.MyGraphQLContext
4+
import com.expediagroup.graphql.spring.execution.ApolloSubscriptionHooks
5+
import kotlinx.coroutines.reactor.mono
6+
import org.springframework.web.reactive.socket.WebSocketSession
7+
import reactor.core.publisher.Mono
8+
9+
/**
10+
* A simple implementation of Apollo Subscription Lifecycle Events.
11+
*/
12+
open class MySubscriptionHooks :
13+
ApolloSubscriptionHooks {
14+
override fun onConnect(
15+
connectionParams: Map<String, String>,
16+
session: WebSocketSession,
17+
graphQLContext: Any?
18+
): Mono<Unit> = mono {
19+
val bearer = connectionParams["Authorization"] ?: "none"
20+
val context = graphQLContext as? MyGraphQLContext
21+
context?.subscriptionValue = bearer
22+
}
23+
}

examples/spring/src/main/kotlin/com/expediagroup/graphql/examples/subscriptions/SimpleSubscription.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package com.expediagroup.graphql.examples.subscriptions
1818

19+
import com.expediagroup.graphql.annotations.GraphQLContext
1920
import com.expediagroup.graphql.annotations.GraphQLDescription
21+
import com.expediagroup.graphql.examples.context.MyGraphQLContext
2022
import com.expediagroup.graphql.spring.operations.Subscription
2123
import kotlinx.coroutines.flow.flowOf
2224
import kotlinx.coroutines.reactive.asPublisher
@@ -59,4 +61,8 @@ class SimpleSubscription : Subscription {
5961

6062
@GraphQLDescription("Returns list of values")
6163
fun flow(): Publisher<Int> = flowOf(1, 2, 4).asPublisher()
64+
65+
@GraphQLDescription("Returns a value from the subscription context")
66+
fun subscriptionContext(@GraphQLContext myGraphQLContext: MyGraphQLContext): Publisher<String> =
67+
flowOf(myGraphQLContext.subscriptionValue ?: "", "value 2", "value3").asPublisher()
6268
}

graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/SubscriptionAutoConfiguration.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package com.expediagroup.graphql.spring
1818

19+
import com.expediagroup.graphql.spring.execution.ApolloSubscriptionHooks
1920
import com.expediagroup.graphql.spring.execution.ApolloSubscriptionProtocolHandler
2021
import com.expediagroup.graphql.spring.execution.SimpleSubscriptionHandler
22+
import com.expediagroup.graphql.spring.execution.SimpleSubscriptionHooks
2123
import com.expediagroup.graphql.spring.execution.SubscriptionHandler
2224
import com.expediagroup.graphql.spring.execution.SubscriptionWebSocketHandler
2325
import com.expediagroup.graphql.spring.operations.Subscription
@@ -57,8 +59,16 @@ class SubscriptionAutoConfiguration {
5759
fun websocketHandlerAdapter(): WebSocketHandlerAdapter = WebSocketHandlerAdapter()
5860

5961
@Bean
60-
fun apolloSubscriptionProtocolHandler(config: GraphQLConfigurationProperties, handler: SubscriptionHandler, objectMapper: ObjectMapper) =
61-
ApolloSubscriptionProtocolHandler(config, handler, objectMapper)
62+
@ConditionalOnMissingBean
63+
fun apolloSubscriptionHooks(): ApolloSubscriptionHooks = SimpleSubscriptionHooks()
64+
65+
@Bean
66+
fun apolloSubscriptionProtocolHandler(
67+
config: GraphQLConfigurationProperties,
68+
handler: SubscriptionHandler,
69+
objectMapper: ObjectMapper,
70+
apolloSubscriptionHooks: ApolloSubscriptionHooks
71+
) = ApolloSubscriptionProtocolHandler(config, handler, objectMapper, apolloSubscriptionHooks)
6272

6373
@Bean
6474
fun subscriptionWebSocketHandler(handler: ApolloSubscriptionProtocolHandler, objectMapper: ObjectMapper) =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2019 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.expediagroup.graphql.spring.execution
17+
18+
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage
19+
import kotlinx.coroutines.reactor.mono
20+
import org.springframework.web.reactive.socket.WebSocketSession
21+
import reactor.core.publisher.Mono
22+
23+
/**
24+
* Implementation of Apollo Subscription Server Lifecycle Events
25+
* https://www.apollographql.com/docs/graphql-subscriptions/lifecycle-events/
26+
*/
27+
interface ApolloSubscriptionHooks {
28+
/**
29+
* Allows validation of connectionParams prior to starting the connection.
30+
* You can reject the connection by throwing an exception
31+
*/
32+
fun onConnect(connectionParams: Map<String, String>, session: WebSocketSession, graphQLContext: Any?): Mono<Unit> = mono {}
33+
34+
/**
35+
* Called when the client executes a GraphQL operation
36+
*/
37+
fun onOperation(
38+
operationMessage: SubscriptionOperationMessage,
39+
session: WebSocketSession,
40+
graphQLContext: Any?
41+
): Mono<Unit> = mono { }
42+
43+
/**
44+
* Called when client's unsubscribes
45+
*/
46+
fun onOperationComplete(session: WebSocketSession): Mono<Unit> = mono { }
47+
48+
/**
49+
* Called when the client disconnects
50+
*/
51+
fun onDisconnect(session: WebSocketSession, graphQLContext: Any?): Mono<Unit> = mono { }
52+
}
53+
54+
/**
55+
* Default implementation of Apollo Subscription Lifecycle Events.
56+
*/
57+
open class SimpleSubscriptionHooks : ApolloSubscriptionHooks

graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/execution/ApolloSubscriptionProtocolHandler.kt

Lines changed: 69 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import com.fasterxml.jackson.module.kotlin.readValue
3434
import org.slf4j.LoggerFactory
3535
import org.springframework.web.reactive.socket.WebSocketSession
3636
import reactor.core.publisher.Flux
37+
import reactor.core.publisher.Mono
3738
import java.time.Duration
3839

3940
/**
@@ -43,7 +44,8 @@ import java.time.Duration
4344
class ApolloSubscriptionProtocolHandler(
4445
private val config: GraphQLConfigurationProperties,
4546
private val subscriptionHandler: SubscriptionHandler,
46-
private val objectMapper: ObjectMapper
47+
private val objectMapper: ObjectMapper,
48+
private val subscriptionHooks: ApolloSubscriptionHooks
4749
) {
4850
private val sessionState = ApolloSubscriptionSessionState()
4951
private val logger = LoggerFactory.getLogger(ApolloSubscriptionProtocolHandler::class.java)
@@ -53,37 +55,39 @@ class ApolloSubscriptionProtocolHandler(
5355

5456
@Suppress("Detekt.TooGenericExceptionCaught")
5557
fun handle(payload: String, session: WebSocketSession): Flux<SubscriptionOperationMessage> {
56-
try {
57-
val operationMessage: SubscriptionOperationMessage = objectMapper.readValue(payload)
58-
59-
logger.debug("GraphQL subscription client message, sessionId=${session.id} operationMessage=$operationMessage")
60-
61-
when (operationMessage.type) {
62-
GQL_CONNECTION_INIT.type -> {
63-
val ackowledgeMessageFlux = Flux.just(acknowledgeMessage)
64-
val keepAliveFlux = getKeepAliveFlux(session)
65-
return ackowledgeMessageFlux.concatWith(keepAliveFlux)
66-
}
67-
GQL_START.type -> return startSubscription(operationMessage, session)
68-
GQL_STOP.type -> return sessionState.stopOperation(session, operationMessage)
69-
GQL_CONNECTION_TERMINATE.type -> {
70-
sessionState.terminateSession(session)
71-
return Flux.empty()
72-
}
73-
else -> {
74-
logger.error("Unknown subscription operation $operationMessage")
75-
sessionState.stopOperation(session, operationMessage)
76-
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
58+
val operationMessage = convertToMessageOrNull(payload) ?: return Flux.just(basicConnectionErrorMessage)
59+
logger.debug("GraphQL subscription client message, sessionId=${session.id} operationMessage=$operationMessage")
60+
return Mono.subscriberContext().flatMapMany<SubscriptionOperationMessage> { reactorContext ->
61+
try {
62+
val graphQLContext = reactorContext.getOrDefault<Any>(GRAPHQL_CONTEXT_KEY, null)
63+
when (operationMessage.type) {
64+
GQL_CONNECTION_INIT.type -> onInit(operationMessage, session, graphQLContext)
65+
GQL_START.type -> onStart(operationMessage, session, graphQLContext)
66+
GQL_STOP.type -> onStop(operationMessage, session)
67+
GQL_CONNECTION_TERMINATE.type -> onDisconnect(session, graphQLContext)
68+
else -> {
69+
logger.error("Unknown subscription operation $operationMessage")
70+
sessionState.stopOperation(session, operationMessage)
71+
Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
72+
}
7773
}
74+
} catch (exception: Exception) {
75+
logger.error("Error parsing the subscription message", exception)
76+
Flux.just(basicConnectionErrorMessage)
7877
}
78+
}
79+
}
80+
@Suppress("Detekt.TooGenericExceptionCaught")
81+
private fun convertToMessageOrNull(payload: String): SubscriptionOperationMessage? {
82+
return try {
83+
objectMapper.readValue(payload)
7984
} catch (exception: Exception) {
8085
logger.error("Error parsing the subscription message", exception)
81-
return Flux.just(basicConnectionErrorMessage)
86+
null
8287
}
8388
}
84-
8589
/**
86-
* If the keep alive configuraation is set, send a message back to client at every interval until the session is terminated.
90+
* If the keep alive configuration is set, send a message back to client at every interval until the session is terminated.
8791
* Otherwise just return empty flux to append to the acknowledge message.
8892
*/
8993
private fun getKeepAliveFlux(session: WebSocketSession): Flux<SubscriptionOperationMessage> {
@@ -98,7 +102,10 @@ class ApolloSubscriptionProtocolHandler(
98102
}
99103

100104
@Suppress("Detekt.TooGenericExceptionCaught")
101-
private fun startSubscription(operationMessage: SubscriptionOperationMessage, session: WebSocketSession): Flux<SubscriptionOperationMessage> {
105+
private fun startSubscription(
106+
operationMessage: SubscriptionOperationMessage,
107+
session: WebSocketSession
108+
): Flux<SubscriptionOperationMessage> {
102109
if (operationMessage.id == null) {
103110
logger.error("GraphQL subscription operation id is required")
104111
return Flux.just(basicConnectionErrorMessage)
@@ -135,4 +142,40 @@ class ApolloSubscriptionProtocolHandler(
135142
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
136143
}
137144
}
145+
146+
private fun onInit(operationMessage: SubscriptionOperationMessage, session: WebSocketSession, graphQLContext: Any?): Flux<SubscriptionOperationMessage> {
147+
val connectionParams = operationMessage.payload as? Map<String, String> ?: emptyMap()
148+
val onConnect = subscriptionHooks.onConnect(connectionParams, session, graphQLContext)
149+
sessionState.saveOnConnectHook(session, onConnect)
150+
val acknowledgeMessageFlux = Flux.just(acknowledgeMessage)
151+
val keepAliveFlux = getKeepAliveFlux(session)
152+
return acknowledgeMessageFlux.concatWith(keepAliveFlux)
153+
}
154+
155+
private fun onStart(
156+
operationMessage: SubscriptionOperationMessage,
157+
session: WebSocketSession,
158+
graphQLContext: Any?
159+
): Flux<SubscriptionOperationMessage> {
160+
val onConnect = sessionState.onConnect(session) ?: subscriptionHooks.onConnect(emptyMap(), session, graphQLContext)
161+
return onConnect.flatMap { subscriptionHooks.onOperation(operationMessage, session, graphQLContext) }
162+
.flatMapMany { startSubscription(operationMessage, session) }
163+
}
164+
165+
private fun onStop(
166+
operationMessage: SubscriptionOperationMessage,
167+
session: WebSocketSession
168+
): Flux<SubscriptionOperationMessage> = subscriptionHooks.onOperationComplete(session)
169+
.flatMapMany {
170+
sessionState.stopOperation(session, operationMessage)
171+
}
172+
173+
private fun onDisconnect(
174+
session: WebSocketSession,
175+
graphQLContext: Any?
176+
): Flux<SubscriptionOperationMessage> = subscriptionHooks.onDisconnect(session, graphQLContext)
177+
.flatMapMany {
178+
sessionState.terminateSession(session)
179+
Flux.empty<SubscriptionOperationMessage>()
180+
}
138181
}

0 commit comments

Comments
 (0)