Skip to content

Commit e264599

Browse files
authored
Fix HTTP2StreamChannel leak (#657)
* Fix HTTP2StreamChannel leak * Update code comments.
1 parent 9401037 commit e264599

File tree

5 files changed

+113
-9
lines changed

5 files changed

+113
-9
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,21 +237,25 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
237237

238238
private func runSuccessfulFinalAction(_ action: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction, context: ChannelHandlerContext) {
239239
switch action {
240-
case .close:
241-
context.close(promise: nil)
240+
case .close, .none:
241+
// The actions returned here come from an `HTTPRequestStateMachine` that assumes http/1.1
242+
// semantics. For this reason we can ignore the close here, since an h2 stream is closed
243+
// after every request anyway.
244+
break
242245

243246
case .sendRequestEnd(let writePromise):
244247
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
245-
246-
case .none:
247-
break
248248
}
249249
}
250250

251251
private func runFailedFinalAction(_ action: HTTPRequestStateMachine.Action.FinalFailedRequestAction, context: ChannelHandlerContext, error: Error) {
252+
// We must close the http2 stream after the request has finished. Since the request failed,
253+
// we have no idea what the h2 streams state was. To be on the save side, we explicitly close
254+
// the h2 stream. This will break a reference cycle in HTTP2Connection.
255+
context.close(promise: nil)
256+
252257
switch action {
253258
case .close(let writePromise):
254-
context.close(promise: nil)
255259
writePromise?.fail(error)
256260

257261
case .none:

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ final class HTTP2Connection {
7777

7878
/// We use this channel set to remember, which open streams we need to inform that
7979
/// we want to close the connection. The channels shall than cancel their currently running
80-
/// request.
80+
/// request. This property must only be accessed from the connections `EventLoop`.
8181
private var openStreams = Set<ChannelBox>()
8282
let id: HTTPConnectionPool.Connection.ID
8383
let decompression: HTTPClient.Decompression
@@ -241,7 +241,7 @@ final class HTTP2Connection {
241241
// before.
242242
let box = ChannelBox(channel)
243243
self.openStreams.insert(box)
244-
self.channel.closeFuture.whenComplete { _ in
244+
channel.closeFuture.whenComplete { _ in
245245
self.openStreams.remove(box)
246246
}
247247

@@ -287,6 +287,11 @@ final class HTTP2Connection {
287287
preconditionFailure("invalid state \(self.state)")
288288
}
289289
}
290+
291+
func __forTesting_getStreamChannels() -> [Channel] {
292+
self.channel.eventLoop.preconditionInEventLoop()
293+
return self.openStreams.map { $0.channel }
294+
}
290295
}
291296

292297
extension HTTP2Connection: HTTP2IdleHandlerDelegate {

Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,14 +335,15 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
335335

336336
// the handler only writes once the channel is writable
337337
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
338+
XCTAssertTrue(embedded.isActive)
338339
embedded.isWritable = true
339340
embedded.pipeline.fireChannelWritabilityChanged()
340341

341342
XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
342343
XCTAssertEqual($0 as? WriteError, WriteError())
343344
}
344345

345-
XCTAssertEqual(embedded.isActive, false)
346+
XCTAssertFalse(embedded.isActive)
346347
}
347348
}
348349

Tests/AsyncHTTPClientTests/HTTP2ConnectionTests+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ extension HTTP2ConnectionTests {
3030
("testSimpleGetRequest", testSimpleGetRequest),
3131
("testEveryDoneRequestLeadsToAStreamAvailableCall", testEveryDoneRequestLeadsToAStreamAvailableCall),
3232
("testCancelAllRunningRequests", testCancelAllRunningRequests),
33+
("testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone", testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone),
3334
]
3435
}
3536
}

Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,99 @@ class HTTP2ConnectionTests: XCTestCase {
243243

244244
XCTAssertNoThrow(try http2Connection.closeFuture.wait())
245245
}
246+
247+
func testChildStreamsAreRemovedFromTheOpenChannelListOnceTheRequestIsDone() {
248+
class SucceedPromiseOnRequestHandler: ChannelInboundHandler {
249+
typealias InboundIn = HTTPServerRequestPart
250+
typealias OutboundOut = HTTPServerResponsePart
251+
252+
let dataArrivedPromise: EventLoopPromise<Void>
253+
let triggerResponseFuture: EventLoopFuture<Void>
254+
255+
init(dataArrivedPromise: EventLoopPromise<Void>, triggerResponseFuture: EventLoopFuture<Void>) {
256+
self.dataArrivedPromise = dataArrivedPromise
257+
self.triggerResponseFuture = triggerResponseFuture
258+
}
259+
260+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
261+
self.dataArrivedPromise.succeed(())
262+
263+
self.triggerResponseFuture.hop(to: context.eventLoop).whenSuccess {
264+
switch self.unwrapInboundIn(data) {
265+
case .head:
266+
context.write(self.wrapOutboundOut(.head(.init(version: .http2, status: .ok))), promise: nil)
267+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
268+
case .body, .end:
269+
break
270+
}
271+
}
272+
}
273+
}
274+
275+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
276+
let eventLoop = eventLoopGroup.next()
277+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
278+
279+
let serverReceivedRequestPromise = eventLoop.makePromise(of: Void.self)
280+
let triggerResponsePromise = eventLoop.makePromise(of: Void.self)
281+
let httpBin = HTTPBin(.http2(compress: false)) { _ in
282+
SucceedPromiseOnRequestHandler(
283+
dataArrivedPromise: serverReceivedRequestPromise,
284+
triggerResponseFuture: triggerResponsePromise.futureResult
285+
)
286+
}
287+
defer { XCTAssertNoThrow(try httpBin.shutdown()) }
288+
289+
let connectionCreator = TestConnectionCreator()
290+
let delegate = TestHTTP2ConnectionDelegate()
291+
var maybeHTTP2Connection: HTTP2Connection?
292+
XCTAssertNoThrow(maybeHTTP2Connection = try connectionCreator.createHTTP2Connection(
293+
to: httpBin.port,
294+
delegate: delegate,
295+
on: eventLoop
296+
))
297+
guard let http2Connection = maybeHTTP2Connection else {
298+
return XCTFail("Expected to have an HTTP2 connection here.")
299+
}
300+
301+
var maybeRequest: HTTPClient.Request?
302+
var maybeRequestBag: RequestBag<ResponseAccumulator>?
303+
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)"))
304+
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
305+
request: XCTUnwrap(maybeRequest),
306+
eventLoopPreference: .indifferent,
307+
task: .init(eventLoop: eventLoop, logger: .init(label: "test")),
308+
redirectHandler: nil,
309+
connectionDeadline: .distantFuture,
310+
requestOptions: .forTests(),
311+
delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest))
312+
))
313+
guard let requestBag = maybeRequestBag else {
314+
return XCTFail("Expected to have a request bag at this point")
315+
}
316+
317+
http2Connection.executeRequest(requestBag)
318+
319+
XCTAssertNoThrow(try serverReceivedRequestPromise.futureResult.wait())
320+
var channelCount: Int?
321+
XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait())
322+
XCTAssertEqual(channelCount, 1)
323+
triggerResponsePromise.succeed(())
324+
325+
XCTAssertNoThrow(try requestBag.task.futureResult.wait())
326+
327+
// this is racy. for this reason we allow a couple of tries
328+
var retryCount = 0
329+
let maxRetries = 1000
330+
while retryCount < maxRetries {
331+
XCTAssertNoThrow(channelCount = try eventLoop.submit { http2Connection.__forTesting_getStreamChannels().count }.wait())
332+
if channelCount == 0 {
333+
break
334+
}
335+
retryCount += 1
336+
}
337+
XCTAssertLessThan(retryCount, maxRetries)
338+
}
246339
}
247340

248341
class TestConnectionCreator {

0 commit comments

Comments
 (0)