Skip to content

Fix Apollo subscription message handling #371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions detekt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ complexity:
ComplexInterface:
threshold: 15

naming:
FunctionMaxLength:
maximumFunctionNameLength: 35

style:
MagicNumber:
ignoreEnums: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package com.expediagroup.graphql.sample.subscriptions

import com.expediagroup.graphql.annotations.GraphQLDescription
import com.expediagroup.graphql.spring.operations.Subscription
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.reactive.asPublisher
import org.reactivestreams.Publisher
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
Expand All @@ -32,4 +35,17 @@ class SimpleSubscription : Subscription {

@GraphQLDescription("Returns a random number every second")
fun counter(): Flux<Int> = Flux.interval(Duration.ofSeconds(1)).map { Random.nextInt() }

@GraphQLDescription("Returns a random number every second, errors if even")
fun counterWithError(): Flux<Int> = Flux.interval(Duration.ofSeconds(1))
.map {
val value = Random.nextInt()
if (value % 2 == 0) {
throw Exception("Value is even $value")
}
else value
}

@GraphQLDescription("Returns list of values")
fun flow(): Publisher<Int> = flowOf(1, 2, 4).asPublisher()
}
3 changes: 3 additions & 0 deletions examples/spring/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
graphql:
packages:
- "com.expediagroup.graphql.sample"
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.expediagroup.graphql.spring

import com.expediagroup.graphql.spring.execution.ApolloSubscriptionProtocolHandler
import com.expediagroup.graphql.spring.execution.SimpleSubscriptionHandler
import com.expediagroup.graphql.spring.execution.SubscriptionHandler
import com.expediagroup.graphql.spring.execution.SubscriptionWebSocketHandler
Expand Down Expand Up @@ -47,7 +48,12 @@ class SubscriptionAutoConfiguration {
fun websocketHandlerAdapter(): WebSocketHandlerAdapter = WebSocketHandlerAdapter()

@Bean
fun subscriptionWebSocketHandler(handler: SubscriptionHandler, objectMapper: ObjectMapper) = SubscriptionWebSocketHandler(handler, objectMapper)
fun apolloSubscriptionProtocolHandler(handler: SubscriptionHandler, objectMapper: ObjectMapper) =
ApolloSubscriptionProtocolHandler(handler, objectMapper)

@Bean
fun subscriptionWebSocketHandler(handler: ApolloSubscriptionProtocolHandler, objectMapper: ObjectMapper) =
SubscriptionWebSocketHandler(handler, objectMapper)

@Bean
fun subscriptionHandlerMapping(config: GraphQLConfigurationProperties, subscriptionWebSocketHandler: SubscriptionWebSocketHandler): HandlerMapping =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2019 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.spring.exception

import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage

class UknownSubscriptionOperationType(operationMessage: SubscriptionOperationMessage) :
IllegalArgumentException("Unknown subscription operation $operationMessage")
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2019 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.spring.execution

import com.expediagroup.graphql.spring.model.GraphQLRequest
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_INIT
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_TERMINATE
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_START
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_STOP
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_COMPLETE
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_CONNECTION_ACK
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_CONNECTION_ERROR
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_DATA
import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ServerMessages.GQL_ERROR
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.readValue
import org.slf4j.LoggerFactory
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Flux

class ApolloSubscriptionProtocolHandler(
private val subscriptionHandler: SubscriptionHandler,
private val objectMapper: ObjectMapper
) {

private val logger = LoggerFactory.getLogger(ApolloSubscriptionProtocolHandler::class.java)

@Suppress("Detekt.TooGenericExceptionCaught")
fun handle(payload: String, session: WebSocketSession): Flux<SubscriptionOperationMessage> {
try {
val operationMessage: SubscriptionOperationMessage = objectMapper.readValue(payload)

return when {
operationMessage.type == GQL_CONNECTION_INIT.type -> Flux.just(SubscriptionOperationMessage(GQL_CONNECTION_ACK.type))
operationMessage.type == GQL_START.type -> startSubscription(operationMessage, session)
operationMessage.type == GQL_CONNECTION_TERMINATE.type || operationMessage.type == GQL_STOP.type -> {
session.close()
Flux.empty()
}
else -> {
logger.error("Unknown subscription operation $operationMessage")
Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
}
}
} catch (exception: Exception) {
logger.error("Error parsing the subscription message", exception)
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type))
}
}

@Suppress("Detekt.TooGenericExceptionCaught")
private fun startSubscription(operationMessage: SubscriptionOperationMessage, session: WebSocketSession): Flux<SubscriptionOperationMessage> {
val payload = operationMessage.payload

if (payload == null) {
logger.error("Payload was null instead of a GraphQLRequest object")
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
}

return try {
val request = objectMapper.convertValue<GraphQLRequest>(payload)
subscriptionHandler.executeSubscription(request)
.map {
if (it.errors?.isNotEmpty() == true) {
SubscriptionOperationMessage(type = GQL_ERROR.type, id = operationMessage.id, payload = it)
} else {
SubscriptionOperationMessage(type = GQL_DATA.type, id = operationMessage.id, payload = it)
}
}
.doOnSubscribe { logger.trace("WebSocket GraphQL subscription subscribe, WebSocketID=${session.id} OperationMessageID=${operationMessage.id}") }
.doOnCancel { logger.trace("WebSocket GraphQL subscription cancel, WebSocketID=${session.id} OperationMessageID=${operationMessage.id}") }
.doOnComplete { logger.trace("WebSocket GraphQL subscription complete, WebSocketID=${session.id} OperationMessageID=${operationMessage.id}") }
.doFinally {
val completeMessage = SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = operationMessage.id)
val text = objectMapper.writeValueAsString(completeMessage)
session.send(Flux.just(session.textMessage(text)))
session.close()
}
} catch (exception: Exception) {
logger.error("Error running graphql subscription", exception)
Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package com.expediagroup.graphql.spring.execution

import com.expediagroup.graphql.spring.model.GraphQLRequest
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import org.slf4j.LoggerFactory
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono
Expand All @@ -28,23 +25,14 @@ import reactor.core.publisher.Mono
* Default WebSocket handler for handling GraphQL subscriptions.
*/
class SubscriptionWebSocketHandler(
private val subscriptionHandler: SubscriptionHandler,
private val subscriptionHandler: ApolloSubscriptionProtocolHandler,
private val objectMapper: ObjectMapper
) : WebSocketHandler {

private val logger = LoggerFactory.getLogger(SubscriptionWebSocketHandler::class.java)

@Suppress("ForbiddenVoid")
override fun handle(session: WebSocketSession): Mono<Void> {
val response = session.receive()
.concatMap {
val graphQLRequest = objectMapper.readValue<GraphQLRequest>(it.payloadAsText)
subscriptionHandler.executeSubscription(graphQLRequest)
.doOnSubscribe { logger.trace("WebSocket GraphQL subscription subscribe, ID=${session.id}") }
.doOnCancel { logger.trace("WebSocket GraphQL subscription cancel, ID=${session.id}") }
.doOnComplete { logger.trace("WebSocket GraphQL subscription complete, ID=${session.id}") }
.doFinally { session.close() }
}
.flatMap { subscriptionHandler.handle(it.payloadAsText, session) }
.map { objectMapper.writeValueAsString(it) }
.map { session.textMessage(it) }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.spring.model

import com.fasterxml.jackson.annotation.JsonIgnoreProperties

/**
* The `graphql-ws` protocol from Apollo Client has some special text messages to signal events.
* Along with the HTTP WebSocket event handling we need to have some extra logic
*
* https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
*/
@JsonIgnoreProperties(ignoreUnknown = true)
data class SubscriptionOperationMessage(
val type: String,
val id: String? = null,
val payload: Any? = null
) {
enum class ClientMessages(val type: String) {
GQL_CONNECTION_INIT("connection_init"),
GQL_START("start"),
GQL_STOP("stop"),
GQL_CONNECTION_TERMINATE("connection_terminate")
}

enum class ServerMessages(val type: String) {
GQL_CONNECTION_ACK("connection_ack"),
GQL_CONNECTION_ERROR("connection_error"),
GQL_DATA("data"),
GQL_ERROR("error"),
GQL_COMPLETE("complete"),
GQL_CONNECTION_KEEP_ALIVE("ka")
}
}
Loading