Skip to content

Fix sendability issues in the connection pool #833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
uses: apple/swift-nio/.github/workflows/unit_tests.yml@main
with:
linux_5_9_enabled: false
linux_5_10_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_5_10_arguments_override: "--explicit-target-dependency-import-check error"
linux_6_0_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_6_1_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: apple/swift-nio/.github/workflows/unit_tests.yml@main
with:
linux_5_9_enabled: false
linux_5_10_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_5_10_arguments_override: "--explicit-target-dependency-import-check error"
linux_6_0_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_6_1_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
self.runTimeoutAction(timeoutAction, context: context)
}

req.willExecuteRequest(self)
req.willExecuteRequest(self.requestExecutor)

let action = self.state.runNewRequest(
head: req.requestHead,
Expand Down Expand Up @@ -323,7 +323,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
case .sendRequestEnd(let writePromise, let shouldClose):
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues
writePromise.futureResult.hop(to: context.eventLoop).whenComplete { result in
writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in
switch result {
case .success:
// If our final action was `sendRequestEnd`, that means we've already received
Expand Down Expand Up @@ -396,7 +396,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
assert(self.idleReadTimeoutTimer == nil, "Expected there is no timeout timer so far.")

let timerID = self.currentIdleReadTimeoutTimerID
self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
self.idleReadTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) {
guard self.currentIdleReadTimeoutTimerID == timerID else { return }
let action = self.state.idleReadTimeoutTriggered()
self.run(action, context: context)
Expand All @@ -409,7 +409,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {

self.currentIdleReadTimeoutTimerID &+= 1
let timerID = self.currentIdleReadTimeoutTimerID
self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
self.idleReadTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) {
guard self.currentIdleReadTimeoutTimerID == timerID else { return }
let action = self.state.idleReadTimeoutTriggered()
self.run(action, context: context)
Expand All @@ -431,7 +431,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
assert(self.idleWriteTimeoutTimer == nil, "Expected there is no timeout timer so far.")

let timerID = self.currentIdleWriteTimeoutTimerID
self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
self.idleWriteTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) {
guard self.currentIdleWriteTimeoutTimerID == timerID else { return }
let action = self.state.idleWriteTimeoutTriggered()
self.run(action, context: context)
Expand All @@ -443,7 +443,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {

self.currentIdleWriteTimeoutTimerID &+= 1
let timerID = self.currentIdleWriteTimeoutTimerID
self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
self.idleWriteTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) {
guard self.currentIdleWriteTimeoutTimerID == timerID else { return }
let action = self.state.idleWriteTimeoutTriggered()
self.run(action, context: context)
Expand All @@ -461,8 +461,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {

// MARK: Private HTTPRequestExecutor

private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?)
{
fileprivate func writeRequestBodyPart0(
_ data: IOData,
request: HTTPExecutableRequest,
promise: EventLoopPromise<Void>?
) {
guard self.request === request, let context = self.channelContext else {
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
Expand All @@ -481,7 +484,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
self.run(action, context: context)
}

private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
fileprivate func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
guard self.request === request, let context = self.channelContext else {
// See code comment in `writeRequestBodyPart0`
promise?.fail(HTTPClientError.requestStreamCancelled)
Expand All @@ -492,7 +495,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
self.run(action, context: context)
}

private func demandResponseBodyStream0(_ request: HTTPExecutableRequest) {
fileprivate func demandResponseBodyStream0(_ request: HTTPExecutableRequest) {
guard self.request === request, let context = self.channelContext else {
// See code comment in `writeRequestBodyPart0`
return
Expand All @@ -504,7 +507,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
self.run(action, context: context)
}

private func cancelRequest0(_ request: HTTPExecutableRequest) {
fileprivate func cancelRequest0(_ request: HTTPExecutableRequest) {
guard self.request === request, let context = self.channelContext else {
// See code comment in `writeRequestBodyPart0`
return
Expand All @@ -524,43 +527,39 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
@available(*, unavailable)
extension HTTP1ClientChannelHandler: Sendable {}

extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.writeRequestBodyPart0(data, request: request, promise: promise)
} else {
self.eventLoop.execute {
self.writeRequestBodyPart0(data, request: request, promise: promise)
extension HTTP1ClientChannelHandler {
var requestExecutor: RequestExecutor {
RequestExecutor(self)
}

struct RequestExecutor: HTTPRequestExecutor, Sendable {
private let loopBound: NIOLoopBound<HTTP1ClientChannelHandler>

init(_ handler: HTTP1ClientChannelHandler) {
self.loopBound = NIOLoopBound(handler, eventLoop: handler.eventLoop)
}

func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
self.loopBound.execute {
$0.writeRequestBodyPart0(data, request: request, promise: promise)
}
}
}

func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.finishRequestBodyStream0(request, promise: promise)
} else {
self.eventLoop.execute {
self.finishRequestBodyStream0(request, promise: promise)
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
self.loopBound.execute {
$0.finishRequestBodyStream0(request, promise: promise)
}
}
}

func demandResponseBodyStream(_ request: HTTPExecutableRequest) {
if self.eventLoop.inEventLoop {
self.demandResponseBodyStream0(request)
} else {
self.eventLoop.execute {
self.demandResponseBodyStream0(request)
func demandResponseBodyStream(_ request: HTTPExecutableRequest) {
self.loopBound.execute {
$0.demandResponseBodyStream0(request)
}
}
}

func cancelRequest(_ request: HTTPExecutableRequest) {
if self.eventLoop.inEventLoop {
self.cancelRequest0(request)
} else {
self.eventLoop.execute {
self.cancelRequest0(request)
func cancelRequest(_ request: HTTPExecutableRequest) {
self.loopBound.execute {
$0.cancelRequest0(request)
}
}
}
Expand Down
64 changes: 40 additions & 24 deletions Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import NIOCore
import NIOHTTP1
import NIOHTTPCompression

protocol HTTP1ConnectionDelegate {
func http1ConnectionReleased(_: HTTP1Connection)
func http1ConnectionClosed(_: HTTP1Connection)
protocol HTTP1ConnectionDelegate: Sendable {
func http1ConnectionReleased(_: HTTPConnectionPool.Connection.ID)
func http1ConnectionClosed(_: HTTPConnectionPool.Connection.ID)
}

final class HTTP1Connection {
Expand Down Expand Up @@ -67,40 +67,53 @@ final class HTTP1Connection {
return connection
}

func executeRequest(_ request: HTTPExecutableRequest) {
if self.channel.eventLoop.inEventLoop {
self.execute0(request: request)
} else {
self.channel.eventLoop.execute {
self.execute0(request: request)
var sendableView: SendableView {
SendableView(self)
}

struct SendableView: Sendable {
private let connection: NIOLoopBound<HTTP1Connection>
let channel: Channel
let id: HTTPConnectionPool.Connection.ID
private var eventLoop: EventLoop { self.connection.eventLoop }

init(_ connection: HTTP1Connection) {
self.connection = NIOLoopBound(connection, eventLoop: connection.channel.eventLoop)
self.id = connection.id
self.channel = connection.channel
}

func executeRequest(_ request: HTTPExecutableRequest) {
self.connection.execute {
$0.execute0(request: request)
}
}
}

func shutdown() {
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
}
func shutdown() {
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
}

func close(promise: EventLoopPromise<Void>?) {
self.channel.close(mode: .all, promise: promise)
}
func close(promise: EventLoopPromise<Void>?) {
self.channel.close(mode: .all, promise: promise)
}

func close() -> EventLoopFuture<Void> {
let promise = self.channel.eventLoop.makePromise(of: Void.self)
self.close(promise: promise)
return promise.futureResult
func close() -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.close(promise: promise)
return promise.futureResult
}
}

func taskCompleted() {
self.delegate.http1ConnectionReleased(self)
self.delegate.http1ConnectionReleased(self.id)
}

private func execute0(request: HTTPExecutableRequest) {
guard self.channel.isActive else {
return request.fail(ChannelError.ioOnClosedChannel)
}

self.channel.write(request, promise: nil)
self.channel.pipeline.syncOperations.write(NIOAny(request), promise: nil)
}

private func start(decompression: HTTPClient.Decompression, logger: Logger) throws {
Expand All @@ -111,9 +124,9 @@ final class HTTP1Connection {
}

self.state = .active
self.channel.closeFuture.whenComplete { _ in
self.channel.closeFuture.assumeIsolated().whenComplete { _ in
self.state = .closed
self.delegate.http1ConnectionClosed(self)
self.delegate.http1ConnectionClosed(self.id)
}

do {
Expand Down Expand Up @@ -150,3 +163,6 @@ final class HTTP1Connection {
}
}
}

@available(*, unavailable)
extension HTTP1Connection: Sendable {}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
self.runTimeoutAction(timeoutAction, context: context)
}

request.willExecuteRequest(self)
request.willExecuteRequest(self.requestExecutor)

let action = self.state.startRequest(
head: request.requestHead,
Expand Down Expand Up @@ -313,7 +313,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
assert(self.idleReadTimeoutTimer == nil, "Expected there is no timeout timer so far.")

let timerID = self.currentIdleReadTimeoutTimerID
self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
self.idleReadTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) {
guard self.currentIdleReadTimeoutTimerID == timerID else { return }
let action = self.state.idleReadTimeoutTriggered()
self.run(action, context: context)
Expand All @@ -326,7 +326,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {

self.currentIdleReadTimeoutTimerID &+= 1
let timerID = self.currentIdleReadTimeoutTimerID
self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
self.idleReadTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) {
guard self.currentIdleReadTimeoutTimerID == timerID else { return }
let action = self.state.idleReadTimeoutTriggered()
self.run(action, context: context)
Expand All @@ -349,7 +349,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
assert(self.idleWriteTimeoutTimer == nil, "Expected there is no timeout timer so far.")

let timerID = self.currentIdleWriteTimeoutTimerID
self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
self.idleWriteTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) {
guard self.currentIdleWriteTimeoutTimerID == timerID else { return }
let action = self.state.idleWriteTimeoutTriggered()
self.run(action, context: context)
Expand All @@ -361,7 +361,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {

self.currentIdleWriteTimeoutTimerID &+= 1
let timerID = self.currentIdleWriteTimeoutTimerID
self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
self.idleWriteTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) {
guard self.currentIdleWriteTimeoutTimerID == timerID else { return }
let action = self.state.idleWriteTimeoutTriggered()
self.run(action, context: context)
Expand Down Expand Up @@ -437,43 +437,39 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
@available(*, unavailable)
extension HTTP2ClientRequestHandler: Sendable {}

extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.writeRequestBodyPart0(data, request: request, promise: promise)
} else {
self.eventLoop.execute {
self.writeRequestBodyPart0(data, request: request, promise: promise)
extension HTTP2ClientRequestHandler {
var requestExecutor: RequestExecutor {
RequestExecutor(self)
}

struct RequestExecutor: HTTPRequestExecutor, Sendable {
private let loopBound: NIOLoopBound<HTTP2ClientRequestHandler>

init(_ handler: HTTP2ClientRequestHandler) {
self.loopBound = NIOLoopBound(handler, eventLoop: handler.eventLoop)
}

func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
self.loopBound.execute {
$0.writeRequestBodyPart0(data, request: request, promise: promise)
}
}
}

func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.finishRequestBodyStream0(request, promise: promise)
} else {
self.eventLoop.execute {
self.finishRequestBodyStream0(request, promise: promise)
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
self.loopBound.execute {
$0.finishRequestBodyStream0(request, promise: promise)
}
}
}

func demandResponseBodyStream(_ request: HTTPExecutableRequest) {
if self.eventLoop.inEventLoop {
self.demandResponseBodyStream0(request)
} else {
self.eventLoop.execute {
self.demandResponseBodyStream0(request)
func demandResponseBodyStream(_ request: HTTPExecutableRequest) {
self.loopBound.execute {
$0.demandResponseBodyStream0(request)
}
}
}

func cancelRequest(_ request: HTTPExecutableRequest) {
if self.eventLoop.inEventLoop {
self.cancelRequest0(request)
} else {
self.eventLoop.execute {
self.cancelRequest0(request)
func cancelRequest(_ request: HTTPExecutableRequest) {
self.loopBound.execute {
$0.cancelRequest0(request)
}
}
}
Expand Down
Loading
Loading