Skip to content

Commit ef06b07

Browse files
authored
Lock ServerCall in ServerCalls re: #151 (#157)
Add a lock to accesses to ServerCall, which should minimally affect performance (everything should be sequential anyway) but protects against thread safety issues.
1 parent a5684bf commit ef06b07

File tree

2 files changed

+7
-27
lines changed

2 files changed

+7
-27
lines changed

stub/src/main/java/io/grpc/kotlin/ServerCalls.kt

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import kotlinx.coroutines.flow.Flow
3636
import kotlinx.coroutines.flow.collect
3737
import kotlinx.coroutines.flow.flow
3838
import kotlinx.coroutines.flow.map
39+
import kotlinx.coroutines.sync.Mutex
40+
import kotlinx.coroutines.sync.withLock
3941
import java.util.concurrent.atomic.AtomicBoolean
4042
import kotlin.coroutines.CoroutineContext
4143
import io.grpc.Metadata as GrpcMetadata
@@ -237,24 +239,21 @@ object ServerCalls {
237239
}
238240

239241
val rpcScope = CoroutineScope(context)
240-
val rpcJob = rpcScope.async {
241-
runCatching {
242+
rpcScope.async {
243+
val mutex = Mutex()
244+
val failure = runCatching {
242245
implementation(requests).collect {
243246
readiness.suspendUntilReady()
244-
call.sendMessage(it)
247+
mutex.withLock { call.sendMessage(it) }
245248
}
246249
}.exceptionOrNull()
247-
}
248-
249-
rpcJob.invokeOnCompletion { cause ->
250-
val failure = cause ?: rpcJob.doneValue
251250
val closeStatus = when (failure) {
252251
null -> Status.OK
253252
is CancellationException -> Status.CANCELLED.withCause(failure)
254253
else -> Status.fromThrowable(failure)
255254
}
256255
val trailers = failure?.let { Status.trailersFromThrowable(it) } ?: GrpcMetadata()
257-
call.close(closeStatus, trailers)
256+
mutex.withLock { call.close(closeStatus, trailers) }
258257
}
259258

260259
return object: ServerCall.Listener<RequestT>() {

stub/src/test/java/io/grpc/kotlin/GeneratedCodeTest.kt

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -329,25 +329,6 @@ class GeneratedCodeTest : AbstractCallsTest() {
329329
test.join()
330330
}
331331

332-
@Test
333-
fun serverScopeCancelledBeforeRpc() = runBlocking {
334-
val serverJob = Job()
335-
val channel = makeChannel(
336-
object : GreeterCoroutineImplBase(serverJob) {
337-
override suspend fun sayHello(request: HelloRequest): HelloReply {
338-
suspendUntilCancelled { /* do nothing */ }
339-
}
340-
}
341-
)
342-
343-
serverJob.cancel()
344-
val stub = GreeterCoroutineStub(channel)
345-
val ex = assertThrows<StatusException> {
346-
stub.sayHello(helloRequest("Greg"))
347-
}
348-
assertThat(ex.status.code).isEqualTo(Status.Code.CANCELLED)
349-
}
350-
351332
@Test
352333
fun serviceDescriptor() {
353334
assertThat(GreeterGrpcKt.serviceDescriptor).isEqualTo(GreeterGrpc.getServiceDescriptor())

0 commit comments

Comments
 (0)