Skip to content

feat: FederatedTypePromiseResolver to settle all promises regardless of errors #1753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import org.dataloader.DataLoader
import java.util.concurrent.CompletableFuture

/**
* Check if all futures collected on [KotlinDataLoaderRegistry.dispatchAll] were handled and we have more futures than we
* had when we started to dispatch, if so, means that [DataLoader]s were chained
* Check if all futures collected on [KotlinDataLoaderRegistry.dispatchAll] were handled
* and if we have more futures than we had when we started to dispatch, if so,
* means that [DataLoader]s were chained so we need to dispatch the dataLoaderRegistry.
*/
fun <V> CompletableFuture<V>.dispatchIfNeeded(
environment: DataFetchingEnvironment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.expediagroup.graphql.generator.federation.exception.InvalidFederatedR
import com.expediagroup.graphql.generator.federation.execution.resolverexecutor.FederatedTypePromiseResolverExecutor
import com.expediagroup.graphql.generator.federation.execution.resolverexecutor.FederatedTypeSuspendResolverExecutor
import com.expediagroup.graphql.generator.federation.execution.resolverexecutor.ResolvableEntity
import com.expediagroup.graphql.generator.federation.extensions.collectAll
import com.expediagroup.graphql.generator.federation.extensions.joinAll
import com.expediagroup.graphql.generator.federation.extensions.toDataFetcherResult
import graphql.execution.DataFetcherResult
import graphql.schema.DataFetcher
Expand Down Expand Up @@ -91,7 +91,7 @@ open class EntitiesDataFetcher(
)

return promises
.collectAll()
.joinAll()
.thenApply { results ->
results.asSequence()
.flatten()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Expedia, Inc
* Copyright 2023 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,8 @@ package com.expediagroup.graphql.generator.federation.execution.resolverexecutor

import com.expediagroup.graphql.generator.federation.exception.FederatedRequestFailure
import com.expediagroup.graphql.generator.federation.execution.FederatedTypePromiseResolver
import com.expediagroup.graphql.generator.federation.extensions.collectAll
import com.expediagroup.graphql.generator.federation.extensions.allSettled
import com.expediagroup.graphql.generator.federation.extensions.joinAll
import graphql.schema.DataFetchingEnvironment
import java.util.concurrent.CompletableFuture

Expand All @@ -27,9 +28,11 @@ object FederatedTypePromiseResolverExecutor : TypeResolverExecutor<FederatedType
resolvableEntities: List<ResolvableEntity<FederatedTypePromiseResolver<*>>>,
environment: DataFetchingEnvironment
): CompletableFuture<List<Map<Int, Any?>>> =
resolvableEntities.map { resolvableEntity ->
resolveEntity(resolvableEntity, environment)
}.collectAll()
resolvableEntities
.map { resolvableEntity ->
resolveEntity(resolvableEntity, environment)
}
.joinAll()

@Suppress("TooGenericExceptionCaught")
private fun resolveEntity(
Expand All @@ -38,20 +41,36 @@ object FederatedTypePromiseResolverExecutor : TypeResolverExecutor<FederatedType
): CompletableFuture<Map<Int, Any?>> {
val indexes = resolvableEntity.indexedRepresentations.map(IndexedValue<Map<String, Any>>::index)
val representations = resolvableEntity.indexedRepresentations.map(IndexedValue<Map<String, Any>>::value)
val resultsPromise = representations.map { representation ->
try {
resolvableEntity.resolver.resolve(environment, representation)
} catch (e: Exception) {
CompletableFuture.completedFuture(
FederatedRequestFailure(
"Exception was thrown while trying to resolve federated type, representation=$representation",
e
return representations
.map { representation ->
// synchronous exceptions while returning CompletableFuture
// as nothing stops users to synchronously throw an exception
try {
resolvableEntity.resolver.resolve(environment, representation)
} catch (e: Exception) {
CompletableFuture.completedFuture(
FederatedRequestFailure(
"Exception was thrown while trying to resolve federated type, representation=$representation",
e
)
)
)
}
}
.allSettled()
.thenApply { results ->
indexes.zip(
results.mapIndexed { index, result ->
// asynchronous exceptions while completing CompletableFuture
try {
result.getOrThrow()
} catch (e: Exception) {
FederatedRequestFailure(
"Exception was thrown while trying to resolve federated type, representation=${representations[index]}",
e
)
}
}
).toMap()
}
}.collectAll()
return resultsPromise.thenApply { results ->
indexes.zip(results).toMap()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Expedia, Inc
* Copyright 2023 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,12 +18,42 @@ package com.expediagroup.graphql.generator.federation.extensions
import java.util.concurrent.CompletableFuture

/**
* Returns a new CompletableFuture of a list with the resolved values of the given CompletableFutures.
* the returned completableFuture will complete when all given CompletableFutures complete.
* Returns a [CompletableFuture] that completes when all the input futures have completed,
* with a list of the resolved values obtained from the completed futures.
* If any of the input futures complete exceptionally, then the returned [CompletableFuture] also completes exceptionally
* with a [java.util.concurrent.CompletionException] holding the exception as its cause.
*
* @return a [CompletableFuture] that completes with a list of resolved values.
*/
internal fun <T : Any?> List<CompletableFuture<out T>>.collectAll(): CompletableFuture<List<T>> =
internal fun <T : Any?> List<CompletableFuture<out T>>.joinAll(): CompletableFuture<List<T>> =
CompletableFuture.allOf(
*toTypedArray()
).thenApply {
map(CompletableFuture<out T>::join)
}

/**
* Returns a [CompletableFuture] that completes when all the input futures have completed,
* with a list of [Result] objects that indicate whether each future completed successfully or with an error.
* If a future completed with an error, the corresponding [Result] object will contain the exception that was thrown.
*
* @return a [CompletableFuture] that completes with a list of [Result] objects.
*/
@Suppress("TooGenericExceptionCaught")
internal fun <T : Any?> List<CompletableFuture<out T>>.allSettled(): CompletableFuture<List<Result<T>>> {
val resultFutures = map { future ->
CompletableFuture.supplyAsync {
try {
Result.success(future.get())
} catch (e: Exception) {
Result.failure<T>(e)
}
}
}

return CompletableFuture.allOf(
*resultFutures.toTypedArray()
).thenApply {
resultFutures.map(CompletableFuture<Result<T>>::join)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Expedia, Inc
* Copyright 2023 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@ import io.mockk.verify
import org.junit.jupiter.api.Test
import reactor.kotlin.core.publisher.toMono
import java.time.Duration
import java.util.concurrent.CompletableFuture
import kotlin.test.assertEquals
import kotlin.test.assertIs

Expand Down Expand Up @@ -82,7 +83,7 @@ class FederatedTypePromiseResolverExecutorTest {
}

@Test
fun `resolver maps the value to a failure when the federated resolver throws an exception`() {
fun `resolver maps the value to a failure when the federated resolver throws an exception synchronously`() {
val representation1 = mapOf("__typename" to "MyType", "id" to 1)
val representation2 = mapOf("__typename" to "MyType", "id" to 2)

Expand Down Expand Up @@ -115,4 +116,41 @@ class FederatedTypePromiseResolverExecutorTest {
mockResolver.resolve(any(), representation2)
}
}

@Test
fun `resolver maps the value to a failure when the federated resolver throws an exception asynchronously`() {
val representation1 = mapOf("__typename" to "MyType", "id" to 1)
val representation2 = mapOf("__typename" to "MyType", "id" to 2)

val indexedRequests: List<IndexedValue<Map<String, Any>>> = listOf(
IndexedValue(1, representation1),
IndexedValue(2, representation2)
)

val mockResolver = mockk<FederatedTypePromiseResolver<*>> {
every { typeName } returns "MyType"
every { resolve(any(), representation1) } returns "MyType1".toMono().delayElement(Duration.ofMillis(100)).toFuture()
every { resolve(any(), representation2) } returns CompletableFuture.supplyAsync {
throw Exception("async exception")
}
}

val resolvableEntity = ResolvableEntity("MyType", indexedRequests, mockResolver)
val environment = mockk<DataFetchingEnvironment> {
every { graphQlContext } returns GraphQLContext.newContext().build()
}

val result = FederatedTypePromiseResolverExecutor.execute(listOf(resolvableEntity), environment).get()
assertEquals(1, result.size)

val resolverResults = result[0]

assertEquals("MyType1", resolverResults[1])
assertIs<FederatedRequestFailure>(resolverResults[2])

verify(exactly = 1) {
mockResolver.resolve(any(), representation1)
mockResolver.resolve(any(), representation2)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2023 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.generator.federation.extensions

import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.future.asCompletableFuture
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import reactor.kotlin.core.publisher.toMono
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutionException
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertTrue

@OptIn(DelicateCoroutinesApi::class)
class CompletableFutureExtensionsKtTest {
@Test
fun `joinAll asynchronously collects a list of completable futures of elements into a completable future list of elements`() {
val firstPromise = "first promise".toMono().delayElement(Duration.ofMillis(500)).toFuture()
val secondPromise = GlobalScope.async {
delay(400)
"second promise"
}.asCompletableFuture()
val thirdPromise = CompletableFuture.completedFuture("third promise")

val result = listOf(firstPromise, secondPromise, thirdPromise).joinAll().join()

assertEquals(3, result.size)
assertEquals("first promise", result[0])
assertEquals("second promise", result[1])
assertEquals("third promise", result[2])
}

@Test
fun `joinAll throws an exception with a completableFuture completes exceptionally`() {
val firstPromise = "first promise".toMono().delayElement(Duration.ofMillis(500)).toFuture()
val secondPromise = GlobalScope.async {
delay(400)
"second promise"
}.asCompletableFuture()
val thirdPromise = CompletableFuture.supplyAsync {
throw Exception("async exception")
}

assertThrows<CompletionException> {
listOf(firstPromise, secondPromise, thirdPromise).joinAll().join()
}
}

@Test
fun `allSettled asynchronously collects a list of completable futures of elements into a completable future list of elements`() {
val firstPromise = "first promise".toMono().delayElement(Duration.ofMillis(500)).toFuture()
val secondPromise = GlobalScope.async {
delay(400)
"second promise"
}.asCompletableFuture()
val thirdPromise = CompletableFuture.completedFuture("third promise")

val result = listOf(firstPromise, secondPromise, thirdPromise).allSettled().join()

assertEquals(3, result.size)
assertEquals("first promise", result[0].getOrNull())
assertEquals("second promise", result[1].getOrNull())
assertEquals("third promise", result[2].getOrNull())
}

@Test
fun `allSettled asynchronously collects a list of completable futures of elements even if a completable future completes exceptionally`() {
val firstPromise = "first promise".toMono().delayElement(Duration.ofMillis(500)).toFuture()
val secondPromise = GlobalScope.async {
delay(400)
"second promise"
}.asCompletableFuture()
val thirdPromise = CompletableFuture.supplyAsync {
throw Exception("async exception")
}

val result = listOf(firstPromise, secondPromise, thirdPromise).allSettled().join()

assertEquals(3, result.size)
assertEquals("first promise", result[0].getOrNull())
assertEquals("second promise", result[1].getOrNull())
assertTrue(result[2].isFailure)
assertIs<ExecutionException>(result[2].exceptionOrNull())
assertEquals("async exception", result[2].exceptionOrNull()?.cause?.message)
}
}