Skip to content

Commit c15b42a

Browse files
authored
Merge branch 'main' into fs-task-logger-access-level
2 parents 1e901b9 + 2adca4b commit c15b42a

35 files changed

+993
-133
lines changed

Package.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ let package = Package(
2727
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.10.0"),
2828
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.11.4"),
2929
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"),
30+
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
3031
],
3132
targets: [
3233
.target(name: "CAsyncHTTPClient"),
@@ -46,6 +47,7 @@ let package = Package(
4647
.product(name: "NIOSOCKS", package: "swift-nio-extras"),
4748
.product(name: "NIOTransportServices", package: "swift-nio-transport-services"),
4849
.product(name: "Logging", package: "swift-log"),
50+
.product(name: "Atomics", package: "swift-atomics"),
4951
]
5052
),
5153
.testTarget(
@@ -61,6 +63,11 @@ let package = Package(
6163
.product(name: "NIOHTTP2", package: "swift-nio-http2"),
6264
.product(name: "NIOSOCKS", package: "swift-nio-extras"),
6365
.product(name: "Logging", package: "swift-log"),
66+
.product(name: "Atomics", package: "swift-atomics"),
67+
],
68+
resources: [
69+
.copy("Resources/self_signed_cert.pem"),
70+
.copy("Resources/self_signed_key.pem"),
6471
]
6572
),
6673
]

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,14 +261,25 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
261261
case .close:
262262
context.close(promise: nil)
263263
oldRequest.succeedRequest(buffer)
264-
case .sendRequestEnd(let writePromise):
264+
case .sendRequestEnd(let writePromise, let shouldClose):
265265
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
266266
// We need to defer succeeding the old request to avoid ordering issues
267-
writePromise.futureResult.whenComplete { result in
267+
writePromise.futureResult.hop(to: context.eventLoop).whenComplete { result in
268268
switch result {
269269
case .success:
270+
// If our final action was `sendRequestEnd`, that means we've already received
271+
// the complete response. As a result, once we've uploaded all the body parts
272+
// we need to tell the pool that the connection is idle or, if we were asked to
273+
// close when we're done, send the close. Either way, we then succeed the request
274+
if shouldClose {
275+
context.close(promise: nil)
276+
} else {
277+
self.connection.taskCompleted()
278+
}
279+
270280
oldRequest.succeedRequest(buffer)
271281
case .failure(let error):
282+
context.close(promise: nil)
272283
oldRequest.fail(error)
273284
}
274285
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ struct HTTP1ConnectionStateMachine {
3535
/// as soon as we wrote the request end onto the wire.
3636
///
3737
/// The promise is an optional write promise.
38-
case sendRequestEnd(EventLoopPromise<Void>?)
38+
///
39+
/// `shouldClose` records whether we have attached a Connection: close header to this request, and so the connection should
40+
/// be terminated
41+
case sendRequestEnd(EventLoopPromise<Void>?, shouldClose: Bool)
3942
/// Inform an observer that the connection has become idle
4043
case informConnectionIsIdle
4144
}
@@ -412,7 +415,8 @@ extension HTTP1ConnectionStateMachine.State {
412415
self = .closing
413416
newFinalAction = .close
414417
case .sendRequestEnd(let writePromise):
415-
newFinalAction = .sendRequestEnd(writePromise)
418+
self = .idle
419+
newFinalAction = .sendRequestEnd(writePromise, shouldClose: close)
416420
case .none:
417421
self = .idle
418422
newFinalAction = close ? .close : .informConnectionIsIdle

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ protocol HTTPConnectionRequester {
4747
func http1ConnectionCreated(_: HTTP1Connection)
4848
func http2ConnectionCreated(_: HTTP2Connection, maximumStreams: Int)
4949
func failedToCreateHTTPConnection(_: HTTPConnectionPool.Connection.ID, error: Error)
50+
func waitingForConnectivity(_: HTTPConnectionPool.Connection.ID, error: Error)
5051
}
5152

5253
extension HTTPConnectionPool.ConnectionFactory {
@@ -62,7 +63,7 @@ extension HTTPConnectionPool.ConnectionFactory {
6263
var logger = logger
6364
logger[metadataKey: "ahc-connection-id"] = "\(connectionID)"
6465

65-
self.makeChannel(connectionID: connectionID, deadline: deadline, eventLoop: eventLoop, logger: logger).whenComplete { result in
66+
self.makeChannel(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop, logger: logger).whenComplete { result in
6667
switch result {
6768
case .success(.http1_1(let channel)):
6869
do {
@@ -104,13 +105,15 @@ extension HTTPConnectionPool.ConnectionFactory {
104105
case http2(Channel)
105106
}
106107

107-
func makeHTTP1Channel(
108+
func makeHTTP1Channel<Requester: HTTPConnectionRequester>(
109+
requester: Requester,
108110
connectionID: HTTPConnectionPool.Connection.ID,
109111
deadline: NIODeadline,
110112
eventLoop: EventLoop,
111113
logger: Logger
112114
) -> EventLoopFuture<Channel> {
113115
self.makeChannel(
116+
requester: requester,
114117
connectionID: connectionID,
115118
deadline: deadline,
116119
eventLoop: eventLoop,
@@ -137,7 +140,8 @@ extension HTTPConnectionPool.ConnectionFactory {
137140
}
138141
}
139142

140-
func makeChannel(
143+
func makeChannel<Requester: HTTPConnectionRequester>(
144+
requester: Requester,
141145
connectionID: HTTPConnectionPool.Connection.ID,
142146
deadline: NIODeadline,
143147
eventLoop: EventLoop,
@@ -150,6 +154,7 @@ extension HTTPConnectionPool.ConnectionFactory {
150154
case .socks:
151155
channelFuture = self.makeSOCKSProxyChannel(
152156
proxy,
157+
requester: requester,
153158
connectionID: connectionID,
154159
deadline: deadline,
155160
eventLoop: eventLoop,
@@ -158,14 +163,15 @@ extension HTTPConnectionPool.ConnectionFactory {
158163
case .http:
159164
channelFuture = self.makeHTTPProxyChannel(
160165
proxy,
166+
requester: requester,
161167
connectionID: connectionID,
162168
deadline: deadline,
163169
eventLoop: eventLoop,
164170
logger: logger
165171
)
166172
}
167173
} else {
168-
channelFuture = self.makeNonProxiedChannel(deadline: deadline, eventLoop: eventLoop, logger: logger)
174+
channelFuture = self.makeNonProxiedChannel(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop, logger: logger)
169175
}
170176

171177
// let's map `ChannelError.connectTimeout` into a `HTTPClientError.connectTimeout`
@@ -179,30 +185,38 @@ extension HTTPConnectionPool.ConnectionFactory {
179185
}
180186
}
181187

182-
private func makeNonProxiedChannel(
188+
private func makeNonProxiedChannel<Requester: HTTPConnectionRequester>(
189+
requester: Requester,
190+
connectionID: HTTPConnectionPool.Connection.ID,
183191
deadline: NIODeadline,
184192
eventLoop: EventLoop,
185193
logger: Logger
186194
) -> EventLoopFuture<NegotiatedProtocol> {
187195
switch self.key.scheme {
188196
case .http, .httpUnix, .unix:
189-
return self.makePlainChannel(deadline: deadline, eventLoop: eventLoop).map { .http1_1($0) }
197+
return self.makePlainChannel(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop).map { .http1_1($0) }
190198
case .https, .httpsUnix:
191-
return self.makeTLSChannel(deadline: deadline, eventLoop: eventLoop, logger: logger).flatMapThrowing {
199+
return self.makeTLSChannel(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop, logger: logger).flatMapThrowing {
192200
channel, negotiated in
193201

194202
try self.matchALPNToHTTPVersion(negotiated, channel: channel)
195203
}
196204
}
197205
}
198206

199-
private func makePlainChannel(deadline: NIODeadline, eventLoop: EventLoop) -> EventLoopFuture<Channel> {
207+
private func makePlainChannel<Requester: HTTPConnectionRequester>(
208+
requester: Requester,
209+
connectionID: HTTPConnectionPool.Connection.ID,
210+
deadline: NIODeadline,
211+
eventLoop: EventLoop
212+
) -> EventLoopFuture<Channel> {
200213
precondition(!self.key.scheme.usesTLS, "Unexpected scheme")
201-
return self.makePlainBootstrap(deadline: deadline, eventLoop: eventLoop).connect(target: self.key.connectionTarget)
214+
return self.makePlainBootstrap(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop).connect(target: self.key.connectionTarget)
202215
}
203216

204-
private func makeHTTPProxyChannel(
217+
private func makeHTTPProxyChannel<Requester: HTTPConnectionRequester>(
205218
_ proxy: HTTPClient.Configuration.Proxy,
219+
requester: Requester,
206220
connectionID: HTTPConnectionPool.Connection.ID,
207221
deadline: NIODeadline,
208222
eventLoop: EventLoop,
@@ -211,7 +225,7 @@ extension HTTPConnectionPool.ConnectionFactory {
211225
// A proxy connection starts with a plain text connection to the proxy server. After
212226
// the connection has been established with the proxy server, the connection might be
213227
// upgraded to TLS before we send our first request.
214-
let bootstrap = self.makePlainBootstrap(deadline: deadline, eventLoop: eventLoop)
228+
let bootstrap = self.makePlainBootstrap(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop)
215229
return bootstrap.connect(host: proxy.host, port: proxy.port).flatMap { channel in
216230
let encoder = HTTPRequestEncoder()
217231
let decoder = ByteToMessageHandler(HTTPResponseDecoder(leftOverBytesStrategy: .dropBytes))
@@ -243,8 +257,9 @@ extension HTTPConnectionPool.ConnectionFactory {
243257
}
244258
}
245259

246-
private func makeSOCKSProxyChannel(
260+
private func makeSOCKSProxyChannel<Requester: HTTPConnectionRequester>(
247261
_ proxy: HTTPClient.Configuration.Proxy,
262+
requester: Requester,
248263
connectionID: HTTPConnectionPool.Connection.ID,
249264
deadline: NIODeadline,
250265
eventLoop: EventLoop,
@@ -253,7 +268,7 @@ extension HTTPConnectionPool.ConnectionFactory {
253268
// A proxy connection starts with a plain text connection to the proxy server. After
254269
// the connection has been established with the proxy server, the connection might be
255270
// upgraded to TLS before we send our first request.
256-
let bootstrap = self.makePlainBootstrap(deadline: deadline, eventLoop: eventLoop)
271+
let bootstrap = self.makePlainBootstrap(requester: requester, connectionID: connectionID, deadline: deadline, eventLoop: eventLoop)
257272
return bootstrap.connect(host: proxy.host, port: proxy.port).flatMap { channel in
258273
let socksConnectHandler = SOCKSClientHandler(targetAddress: SOCKSAddress(self.key.connectionTarget))
259274
let socksEventHandler = SOCKSEventsHandler(deadline: deadline)
@@ -331,14 +346,21 @@ extension HTTPConnectionPool.ConnectionFactory {
331346
}
332347
}
333348

334-
private func makePlainBootstrap(deadline: NIODeadline, eventLoop: EventLoop) -> NIOClientTCPBootstrapProtocol {
349+
private func makePlainBootstrap<Requester: HTTPConnectionRequester>(
350+
requester: Requester,
351+
connectionID: HTTPConnectionPool.Connection.ID,
352+
deadline: NIODeadline,
353+
eventLoop: EventLoop
354+
) -> NIOClientTCPBootstrapProtocol {
335355
#if canImport(Network)
336356
if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), let tsBootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoop) {
337357
return tsBootstrap
358+
.channelOption(NIOTSChannelOptions.waitForActivity, value: self.clientConfiguration.networkFrameworkWaitForConnectivity)
338359
.connectTimeout(deadline - NIODeadline.now())
339360
.channelInitializer { channel in
340361
do {
341362
try channel.pipeline.syncOperations.addHandler(HTTPClient.NWErrorHandler())
363+
try channel.pipeline.syncOperations.addHandler(NWWaitingHandler(requester: requester, connectionID: connectionID))
342364
return channel.eventLoop.makeSucceededVoidFuture()
343365
} catch {
344366
return channel.eventLoop.makeFailedFuture(error)
@@ -355,9 +377,17 @@ extension HTTPConnectionPool.ConnectionFactory {
355377
preconditionFailure("No matching bootstrap found")
356378
}
357379

358-
private func makeTLSChannel(deadline: NIODeadline, eventLoop: EventLoop, logger: Logger) -> EventLoopFuture<(Channel, String?)> {
380+
private func makeTLSChannel<Requester: HTTPConnectionRequester>(
381+
requester: Requester,
382+
connectionID: HTTPConnectionPool.Connection.ID,
383+
deadline: NIODeadline,
384+
eventLoop: EventLoop,
385+
logger: Logger
386+
) -> EventLoopFuture<(Channel, String?)> {
359387
precondition(self.key.scheme.usesTLS, "Unexpected scheme")
360388
let bootstrapFuture = self.makeTLSBootstrap(
389+
requester: requester,
390+
connectionID: connectionID,
361391
deadline: deadline,
362392
eventLoop: eventLoop,
363393
logger: logger
@@ -387,8 +417,13 @@ extension HTTPConnectionPool.ConnectionFactory {
387417
return channelFuture
388418
}
389419

390-
private func makeTLSBootstrap(deadline: NIODeadline, eventLoop: EventLoop, logger: Logger)
391-
-> EventLoopFuture<NIOClientTCPBootstrapProtocol> {
420+
private func makeTLSBootstrap<Requester: HTTPConnectionRequester>(
421+
requester: Requester,
422+
connectionID: HTTPConnectionPool.Connection.ID,
423+
deadline: NIODeadline,
424+
eventLoop: EventLoop,
425+
logger: Logger
426+
) -> EventLoopFuture<NIOClientTCPBootstrapProtocol> {
392427
var tlsConfig = self.tlsConfiguration
393428
switch self.clientConfiguration.httpVersion.configuration {
394429
case .automatic:
@@ -408,11 +443,13 @@ extension HTTPConnectionPool.ConnectionFactory {
408443
options -> NIOClientTCPBootstrapProtocol in
409444

410445
tsBootstrap
446+
.channelOption(NIOTSChannelOptions.waitForActivity, value: self.clientConfiguration.networkFrameworkWaitForConnectivity)
411447
.connectTimeout(deadline - NIODeadline.now())
412448
.tlsOptions(options)
413449
.channelInitializer { channel in
414450
do {
415451
try channel.pipeline.syncOperations.addHandler(HTTPClient.NWErrorHandler())
452+
try channel.pipeline.syncOperations.addHandler(NWWaitingHandler(requester: requester, connectionID: connectionID))
416453
// we don't need to set a TLS deadline for NIOTS connections, since the
417454
// TLS handshake is part of the TS connection bootstrap. If the TLS
418455
// handshake times out the complete connection creation will be failed.

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Manager.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Atomics
1516
import Logging
1617
import NIOConcurrencyHelpers
1718
import NIOCore
@@ -165,14 +166,14 @@ extension HTTPConnectionPool.Connection.ID {
165166
static var globalGenerator = Generator()
166167

167168
struct Generator {
168-
private let atomic: NIOAtomic<Int>
169+
private let atomic: ManagedAtomic<Int>
169170

170171
init() {
171-
self.atomic = .makeAtomic(value: 0)
172+
self.atomic = .init(0)
172173
}
173174

174175
func next() -> Int {
175-
return self.atomic.add(1)
176+
return self.atomic.loadThenWrappingIncrement(ordering: .relaxed)
176177
}
177178
}
178179
}

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,6 @@ final class HTTPConnectionPool {
147147
self.unlocked = Unlocked(connection: .none, request: .none)
148148

149149
switch stateMachineAction.request {
150-
case .cancelRequestTimeout(let requestID):
151-
self.locked.request = .cancelRequestTimeout(requestID)
152150
case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout):
153151
if cancelTimeout {
154152
self.locked.request = .cancelRequestTimeout(request.id)
@@ -467,6 +465,16 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
467465
$0.failedToCreateNewConnection(error, connectionID: connectionID)
468466
}
469467
}
468+
469+
func waitingForConnectivity(_ connectionID: HTTPConnectionPool.Connection.ID, error: Error) {
470+
self.logger.debug("waiting for connectivity", metadata: [
471+
"ahc-error": "\(error)",
472+
"ahc-connection-id": "\(connectionID)",
473+
])
474+
self.modifyStateAndRunActions {
475+
$0.waitingForConnectivity(error, connectionID: connectionID)
476+
}
477+
}
470478
}
471479

472480
extension HTTPConnectionPool: HTTP1ConnectionDelegate {

Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ extension HTTPConnectionPool {
241241
}
242242
}
243243

244+
mutating func waitingForConnectivity(_ error: Error, connectionID: Connection.ID) -> Action {
245+
self.lastConnectFailure = error
246+
247+
return .init(request: .none, connection: .none)
248+
}
249+
244250
mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
245251
switch self.lifecycleState {
246252
case .running:
@@ -317,9 +323,11 @@ extension HTTPConnectionPool {
317323

318324
mutating func cancelRequest(_ requestID: Request.ID) -> Action {
319325
// 1. check requests in queue
320-
if self.requests.remove(requestID) != nil {
326+
if let request = self.requests.remove(requestID) {
327+
// Use the last connection error to let the user know why the request was never scheduled
328+
let error = self.lastConnectFailure ?? HTTPClientError.cancelled
321329
return .init(
322-
request: .cancelRequestTimeout(requestID),
330+
request: .failRequest(request, error, cancelTimeout: true),
323331
connection: .none
324332
)
325333
}

Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,11 @@ extension HTTPConnectionPool {
406406
return .init(request: .none, connection: .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop))
407407
}
408408

409+
mutating func waitingForConnectivity(_ error: Error, connectionID: Connection.ID) -> Action {
410+
self.lastConnectFailure = error
411+
return .init(request: .none, connection: .none)
412+
}
413+
409414
mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
410415
// The naming of `failConnection` is a little confusing here. All it does is moving the
411416
// connection state from `.backingOff` to `.closed` here. It also returns the
@@ -439,9 +444,11 @@ extension HTTPConnectionPool {
439444

440445
mutating func cancelRequest(_ requestID: Request.ID) -> Action {
441446
// 1. check requests in queue
442-
if self.requests.remove(requestID) != nil {
447+
if let request = self.requests.remove(requestID) {
448+
// Use the last connection error to let the user know why the request was never scheduled
449+
let error = self.lastConnectFailure ?? HTTPClientError.cancelled
443450
return .init(
444-
request: .cancelRequestTimeout(requestID),
451+
request: .failRequest(request, error, cancelTimeout: true),
445452
connection: .none
446453
)
447454
}

0 commit comments

Comments
 (0)