Skip to content

Lock ServerCall in ServerCalls re: #151 #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 11, 2020
15 changes: 7 additions & 8 deletions stub/src/main/java/io/grpc/kotlin/ServerCalls.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import io.grpc.Metadata as GrpcMetadata
Expand Down Expand Up @@ -237,24 +239,21 @@ object ServerCalls {
}

val rpcScope = CoroutineScope(context)
val rpcJob = rpcScope.async {
runCatching {
rpcScope.async {
val mutex = Mutex()
val failure = runCatching {
implementation(requests).collect {
readiness.suspendUntilReady()
call.sendMessage(it)
mutex.withLock { call.sendMessage(it) }
}
}.exceptionOrNull()
}

rpcJob.invokeOnCompletion { cause ->
val failure = cause ?: rpcJob.doneValue
val closeStatus = when (failure) {
null -> Status.OK
is CancellationException -> Status.CANCELLED.withCause(failure)
else -> Status.fromThrowable(failure)
}
val trailers = failure?.let { Status.trailersFromThrowable(it) } ?: GrpcMetadata()
call.close(closeStatus, trailers)
mutex.withLock { call.close(closeStatus, trailers) }
}

return object: ServerCall.Listener<RequestT>() {
Expand Down
19 changes: 0 additions & 19 deletions stub/src/test/java/io/grpc/kotlin/GeneratedCodeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -329,25 +329,6 @@ class GeneratedCodeTest : AbstractCallsTest() {
test.join()
}

@Test
fun serverScopeCancelledBeforeRpc() = runBlocking {
val serverJob = Job()
val channel = makeChannel(
object : GreeterCoroutineImplBase(serverJob) {
override suspend fun sayHello(request: HelloRequest): HelloReply {
suspendUntilCancelled { /* do nothing */ }
}
}
)

serverJob.cancel()
val stub = GreeterCoroutineStub(channel)
val ex = assertThrows<StatusException> {
stub.sayHello(helloRequest("Greg"))
}
assertThat(ex.status.code).isEqualTo(Status.Code.CANCELLED)
}

@Test
fun serviceDescriptor() {
assertThat(GreeterGrpcKt.serviceDescriptor).isEqualTo(GreeterGrpc.getServiceDescriptor())
Expand Down