Skip to content

Commit 3a91e41

Browse files
samuelAndalonSamuel Vazquez
authored and
Samuel Vazquez
committed
feat(batching): v7 synchronize executions when attempting to remove an entry (#2013)
### 📝 Description cherry-pick #2012
1 parent 9eabdde commit 3a91e41

File tree

3 files changed

+34
-32
lines changed

3 files changed

+34
-32
lines changed

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/extensions/CompletableFutureExtensions.kt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,25 @@ fun <V> CompletableFuture<V>.dispatchIfNeeded(
3737
val dataLoaderRegistry = environment.dataLoaderRegistry as? KotlinDataLoaderRegistry ?: throw MissingKotlinDataLoaderRegistryException()
3838

3939
if (dataLoaderRegistry.dataLoadersInvokedOnDispatch()) {
40-
val cantContinueExecution = when {
40+
when {
4141
environment.graphQlContext.hasKey(ExecutionLevelDispatchedState::class) -> {
42-
environment
43-
.graphQlContext.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
44-
.allExecutionsDispatched(Level(environment.executionStepInfo.path.level))
42+
val cantContinueExecution =
43+
environment
44+
.graphQlContext.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
45+
.allExecutionsDispatched(Level(environment.executionStepInfo.path.level))
46+
if (cantContinueExecution) {
47+
dataLoaderRegistry.dispatchAll()
48+
}
4549
}
4650
environment.graphQlContext.hasKey(SyncExecutionExhaustedState::class) -> {
4751
environment
4852
.graphQlContext.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
49-
.allSyncExecutionsExhausted()
53+
.ifAllSyncExecutionsExhausted {
54+
dataLoaderRegistry.dispatchAll()
55+
}
5056
}
5157
else -> throw MissingInstrumentationStateException()
5258
}
53-
54-
if (cantContinueExecution) {
55-
dataLoaderRegistry.dispatchAll()
56-
}
5759
}
5860
return this
5961
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/state/SyncExecutionExhaustedState.kt

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,12 @@ class SyncExecutionExhaustedState(
8484
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
8585
if ((result != null && result.errors.size > 0) || t != null) {
8686
if (executions.containsKey(parameters.executionInput.executionId)) {
87-
executions.remove(parameters.executionInput.executionId)
88-
totalExecutions.set(totalExecutions.get() - 1)
89-
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
90-
if (allSyncExecutionsExhausted) {
91-
onSyncExecutionExhausted(executions.keys().toList())
87+
synchronized(executions) {
88+
executions.remove(parameters.executionInput.executionId)
89+
totalExecutions.set(totalExecutions.get() - 1)
90+
}
91+
ifAllSyncExecutionsExhausted { executionIds ->
92+
onSyncExecutionExhausted(executionIds)
9293
}
9394
}
9495
}
@@ -147,9 +148,8 @@ class SyncExecutionExhaustedState(
147148
executionState
148149
}
149150

150-
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
151-
if (allSyncExecutionsExhausted) {
152-
onSyncExecutionExhausted(executions.keys().toList())
151+
ifAllSyncExecutionsExhausted { executionIds ->
152+
onSyncExecutionExhausted(executionIds)
153153
}
154154
}
155155
override fun onCompleted(result: Any?, t: Throwable?) {
@@ -158,26 +158,26 @@ class SyncExecutionExhaustedState(
158158
executionState
159159
}
160160

161-
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
162-
if (allSyncExecutionsExhausted) {
163-
onSyncExecutionExhausted(executions.keys().toList())
161+
ifAllSyncExecutionsExhausted { executionIds ->
162+
onSyncExecutionExhausted(executionIds)
164163
}
165164
}
166165
}
167166
}
168167

169168
/**
170-
* Provide the information about when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution
169+
* execute a given [predicate] when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution.
171170
* A Synchronous Execution is considered Exhausted when all [DataFetcher]s of all paths were executed up until
172171
* a scalar leaf or a [DataFetcher] that returns a [CompletableFuture]
173172
*/
174-
fun allSyncExecutionsExhausted(): Boolean = synchronized(executions) {
175-
val operationsToExecute = totalExecutions.get()
176-
when {
177-
executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled() -> false
178-
else -> {
179-
executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)
173+
fun ifAllSyncExecutionsExhausted(predicate: (List<ExecutionId>) -> Unit) =
174+
synchronized(executions) {
175+
val operationsToExecute = totalExecutions.get()
176+
if (executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled())
177+
return@synchronized
178+
179+
if (executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)) {
180+
predicate(executions.keys().toList())
180181
}
181182
}
182-
}
183183
}

executions/graphql-kotlin-dataloader-instrumentation/src/test/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/DataLoaderSyncExecutionExhaustedInstrumentationTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -616,9 +616,9 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
616616
fun `Instrumentation should not consider executions that thrown exceptions`() {
617617
val executions = listOf(
618618
ExecutionInput.newExecutionInput("query test1 { astronaut(id: 1) { id name } }").operationName("test1").build(),
619-
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("test2").build(),
620-
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("test3").build(),
621-
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build()
619+
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
620+
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
621+
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("test4").build()
622622
)
623623

624624
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
@@ -633,7 +633,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
633633
val missionStatistics = kotlinDataLoaderRegistry.dataLoadersMap["MissionDataLoader"]?.statistics
634634

635635
assertEquals(1, astronautStatistics?.batchInvokeCount)
636-
assertEquals(2, astronautStatistics?.batchLoadCount)
636+
assertEquals(1, astronautStatistics?.batchLoadCount)
637637

638638
assertEquals(1, missionStatistics?.batchInvokeCount)
639639
assertEquals(1, missionStatistics?.batchLoadCount)

0 commit comments

Comments
 (0)