Skip to content

Commit 03f4234

Browse files
committed
Fix HTTP2StreamChannel leak
1 parent 59bfb96 commit 03f4234

File tree

5 files changed

+88
-9
lines changed

5 files changed

+88
-9
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -239,21 +239,21 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
239239

240240
private func runSuccessfulFinalAction(_ action: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction, context: ChannelHandlerContext) {
241241
switch action {
242-
case .close:
243-
context.close(promise: nil)
242+
case .close, .none:
243+
break
244244

245245
case .sendRequestEnd(let writePromise):
246246
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
247-
248-
case .none:
249-
break
250247
}
251248
}
252249

253250
private func runFailedFinalAction(_ action: HTTPRequestStateMachine.Action.FinalFailedRequestAction, context: ChannelHandlerContext, error: Error) {
251+
// We must close the http2 stream after the request has finished. This breaks a reference
252+
// cycle in HTTP2Connection.
253+
context.close(promise: nil)
254+
254255
switch action {
255256
case .close(let writePromise):
256-
context.close(promise: nil)
257257
writePromise?.fail(error)
258258

259259
case .none:

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ final class HTTP2Connection {
7777

7878
/// We use this channel set to remember, which open streams we need to inform that
7979
/// we want to close the connection. The channels shall than cancel their currently running
80-
/// request.
80+
/// request. This property must only be accessed from the connections `EventLoop`.
8181
private var openStreams = Set<ChannelBox>()
8282
let id: HTTPConnectionPool.Connection.ID
8383
let decompression: HTTPClient.Decompression
@@ -241,7 +241,7 @@ final class HTTP2Connection {
241241
// before.
242242
let box = ChannelBox(channel)
243243
self.openStreams.insert(box)
244-
self.channel.closeFuture.whenComplete { _ in
244+
channel.closeFuture.whenComplete { _ in
245245
self.openStreams.remove(box)
246246
}
247247

@@ -287,6 +287,11 @@ final class HTTP2Connection {
287287
preconditionFailure("invalid state \(self.state)")
288288
}
289289
}
290+
291+
func __forTesting_getStreamChannels() -> [Channel] {
292+
self.channel.eventLoop.preconditionInEventLoop()
293+
return self.openStreams.map { $0.channel }
294+
}
290295
}
291296

292297
extension HTTP2Connection: HTTP2IdleHandlerDelegate {

Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,14 +335,15 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
335335

336336
// the handler only writes once the channel is writable
337337
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
338+
XCTAssertTrue(embedded.isActive)
338339
embedded.isWritable = true
339340
embedded.pipeline.fireChannelWritabilityChanged()
340341

341342
XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
342343
XCTAssertEqual($0 as? WriteError, WriteError())
343344
}
344345

345-
XCTAssertEqual(embedded.isActive, false)
346+
XCTAssertFalse(embedded.isActive)
346347
}
347348
}
348349

Tests/AsyncHTTPClientTests/HTTP2ConnectionTests+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ extension HTTP2ConnectionTests {
3030
("testSimpleGetRequest", testSimpleGetRequest),
3131
("testEveryDoneRequestLeadsToAStreamAvailableCall", testEveryDoneRequestLeadsToAStreamAvailableCall),
3232
("testCancelAllRunningRequests", testCancelAllRunningRequests),
33+
("testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone", testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone),
3334
]
3435
}
3536
}

Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,78 @@ class HTTP2ConnectionTests: XCTestCase {
243243

244244
XCTAssertNoThrow(try http2Connection.closeFuture.wait())
245245
}
246+
247+
func testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone() {
248+
class SucceedPromiseOnRequestHandler: ChannelInboundHandler {
249+
typealias InboundIn = HTTPServerRequestPart
250+
typealias OutboundOut = HTTPServerResponsePart
251+
252+
let promise: EventLoopPromise<Void>
253+
254+
init(promise: EventLoopPromise<Void>) { self.promise = promise }
255+
256+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
257+
self.promise.succeed(())
258+
switch self.unwrapInboundIn(data) {
259+
case .head:
260+
context.write(self.wrapOutboundOut(.head(.init(version: .http2, status: .ok))), promise: nil)
261+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
262+
case .body, .end:
263+
break
264+
}
265+
}
266+
}
267+
268+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
269+
let eventLoop = eventLoopGroup.next()
270+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
271+
272+
let serverReceivedRequestPromise = eventLoop.makePromise(of: Void.self)
273+
let httpBin = HTTPBin(.http2(compress: false)) { _ in
274+
SucceedPromiseOnRequestHandler(promise: serverReceivedRequestPromise)
275+
}
276+
defer { XCTAssertNoThrow(try httpBin.shutdown()) }
277+
278+
let connectionCreator = TestConnectionCreator()
279+
let delegate = TestHTTP2ConnectionDelegate()
280+
var maybeHTTP2Connection: HTTP2Connection?
281+
XCTAssertNoThrow(maybeHTTP2Connection = try connectionCreator.createHTTP2Connection(
282+
to: httpBin.port,
283+
delegate: delegate,
284+
on: eventLoop
285+
))
286+
guard let http2Connection = maybeHTTP2Connection else {
287+
return XCTFail("Expected to have an HTTP2 connection here.")
288+
}
289+
290+
var maybeRequest: HTTPClient.Request?
291+
var maybeRequestBag: RequestBag<ResponseAccumulator>?
292+
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)"))
293+
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
294+
request: XCTUnwrap(maybeRequest),
295+
eventLoopPreference: .indifferent,
296+
task: .init(eventLoop: eventLoop, logger: .init(label: "test")),
297+
redirectHandler: nil,
298+
connectionDeadline: .distantFuture,
299+
requestOptions: .forTests(),
300+
delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest))
301+
))
302+
guard let requestBag = maybeRequestBag else {
303+
return XCTFail("Expected to have a request bag at this point")
304+
}
305+
306+
http2Connection.executeRequest(requestBag)
307+
308+
XCTAssertNoThrow(try serverReceivedRequestPromise.futureResult.wait())
309+
var channelCount: Int?
310+
XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait())
311+
XCTAssertEqual(channelCount, 1)
312+
313+
XCTAssertNoThrow(try requestBag.task.futureResult.wait())
314+
315+
XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait())
316+
XCTAssertEqual(channelCount, 0)
317+
}
246318
}
247319

248320
class TestConnectionCreator {

0 commit comments

Comments
 (0)