Skip to content

Commit c7e23e0

Browse files
feat(batching): v7 synchronize executions when attempting to remove an entry (#2013)
### 📝 Description cherry-pick #2012
1 parent b3795bb commit c7e23e0

File tree

3 files changed

+34
-32
lines changed

3 files changed

+34
-32
lines changed

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/extensions/CompletableFutureExtensions.kt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,25 @@ fun <V> CompletableFuture<V>.dispatchIfNeeded(
3737
val dataLoaderRegistry = environment.dataLoaderRegistry as? KotlinDataLoaderRegistry ?: throw MissingKotlinDataLoaderRegistryException()
3838

3939
if (dataLoaderRegistry.dataLoadersInvokedOnDispatch()) {
40-
val cantContinueExecution = when {
40+
when {
4141
environment.graphQlContext.hasKey(ExecutionLevelDispatchedState::class) -> {
42-
environment
43-
.graphQlContext.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
44-
.allExecutionsDispatched(Level(environment.executionStepInfo.path.level))
42+
val cantContinueExecution =
43+
environment
44+
.graphQlContext.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
45+
.allExecutionsDispatched(Level(environment.executionStepInfo.path.level))
46+
if (cantContinueExecution) {
47+
dataLoaderRegistry.dispatchAll()
48+
}
4549
}
4650
environment.graphQlContext.hasKey(SyncExecutionExhaustedState::class) -> {
4751
environment
4852
.graphQlContext.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
49-
.allSyncExecutionsExhausted()
53+
.ifAllSyncExecutionsExhausted {
54+
dataLoaderRegistry.dispatchAll()
55+
}
5056
}
5157
else -> throw MissingInstrumentationStateException()
5258
}
53-
54-
if (cantContinueExecution) {
55-
dataLoaderRegistry.dispatchAll()
56-
}
5759
}
5860
return this
5961
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/state/SyncExecutionExhaustedState.kt

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,12 @@ class SyncExecutionExhaustedState(
6363
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
6464
if ((result != null && result.errors.size > 0) || t != null) {
6565
if (executions.containsKey(parameters.executionInput.executionId)) {
66-
executions.remove(parameters.executionInput.executionId)
67-
totalExecutions.set(totalExecutions.get() - 1)
68-
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
69-
if (allSyncExecutionsExhausted) {
70-
onSyncExecutionExhausted(executions.keys().toList())
66+
synchronized(executions) {
67+
executions.remove(parameters.executionInput.executionId)
68+
totalExecutions.set(totalExecutions.get() - 1)
69+
}
70+
ifAllSyncExecutionsExhausted { executionIds ->
71+
onSyncExecutionExhausted(executionIds)
7172
}
7273
}
7374
}
@@ -126,9 +127,8 @@ class SyncExecutionExhaustedState(
126127
executionState
127128
}
128129

129-
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
130-
if (allSyncExecutionsExhausted) {
131-
onSyncExecutionExhausted(executions.keys().toList())
130+
ifAllSyncExecutionsExhausted { executionIds ->
131+
onSyncExecutionExhausted(executionIds)
132132
}
133133
}
134134
override fun onCompleted(result: Any?, t: Throwable?) {
@@ -137,26 +137,26 @@ class SyncExecutionExhaustedState(
137137
executionState
138138
}
139139

140-
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
141-
if (allSyncExecutionsExhausted) {
142-
onSyncExecutionExhausted(executions.keys().toList())
140+
ifAllSyncExecutionsExhausted { executionIds ->
141+
onSyncExecutionExhausted(executionIds)
143142
}
144143
}
145144
}
146145
}
147146

148147
/**
149-
* Provide the information about when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution
148+
* execute a given [predicate] when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution.
150149
* A Synchronous Execution is considered Exhausted when all [DataFetcher]s of all paths were executed up until
151150
* a scalar leaf or a [DataFetcher] that returns a [CompletableFuture]
152151
*/
153-
fun allSyncExecutionsExhausted(): Boolean = synchronized(executions) {
154-
val operationsToExecute = totalExecutions.get()
155-
when {
156-
executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled() -> false
157-
else -> {
158-
executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)
152+
fun ifAllSyncExecutionsExhausted(predicate: (List<ExecutionId>) -> Unit) =
153+
synchronized(executions) {
154+
val operationsToExecute = totalExecutions.get()
155+
if (executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled())
156+
return@synchronized
157+
158+
if (executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)) {
159+
predicate(executions.keys().toList())
159160
}
160161
}
161-
}
162162
}

executions/graphql-kotlin-dataloader-instrumentation/src/test/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/DataLoaderSyncExecutionExhaustedInstrumentationTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -614,9 +614,9 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
614614
fun `Instrumentation should not consider executions that thrown exceptions`() {
615615
val executions = listOf(
616616
ExecutionInput.newExecutionInput("query test1 { astronaut(id: 1) { id name } }").operationName("test1").build(),
617-
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("test2").build(),
618-
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("test3").build(),
619-
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build()
617+
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
618+
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
619+
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("test4").build()
620620
)
621621

622622
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
@@ -631,7 +631,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
631631
val missionStatistics = kotlinDataLoaderRegistry.dataLoadersMap["MissionDataLoader"]?.statistics
632632

633633
assertEquals(1, astronautStatistics?.batchInvokeCount)
634-
assertEquals(2, astronautStatistics?.batchLoadCount)
634+
assertEquals(1, astronautStatistics?.batchLoadCount)
635635

636636
assertEquals(1, missionStatistics?.batchInvokeCount)
637637
assertEquals(1, missionStatistics?.batchLoadCount)

0 commit comments

Comments
 (0)