Skip to content

Fix crash when receiving 2xx response before stream is complete. #591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
writePromise.futureResult.whenComplete { result in
switch result {
case .success:
// If our final action was `sendRequestEnd`, that means we've already received
// the complete response. As a result, once we've uploaded all the body parts
// we need to tell the pool that the connection is idle.
self.connection.taskCompleted()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a comment here? Something like:

If our final action is sendRequestEnd this means, that we have already received the complete response. For this reason we must mark the connection as idle to the pool, once we have uploaded all body parts.

oldRequest.succeedRequest(buffer)
case .failure(let error):
oldRequest.fail(error)
Expand Down
12 changes: 8 additions & 4 deletions Sources/AsyncHTTPClient/RequestBag+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,14 @@ extension RequestBag.StateMachine {
self.state = .executing(executor, requestState, .buffering(currentBuffer, next: next))
return .none
case .executing(let executor, let requestState, .waitingForRemote):
var buffer = buffer
let first = buffer.removeFirst()
self.state = .executing(executor, requestState, .buffering(buffer, next: .askExecutorForMore))
return .forwardResponsePart(first)
if buffer.count > 0 {
var buffer = buffer
let first = buffer.removeFirst()
self.state = .executing(executor, requestState, .buffering(buffer, next: .askExecutorForMore))
return .forwardResponsePart(first)
} else {
return .none
}
case .redirected(let executor, var receivedBytes, let head, let redirectURL):
let partsLength = buffer.reduce(into: 0) { $0 += $1.readableBytes }
receivedBytes += partsLength
Expand Down
31 changes: 31 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,37 @@ class HTTPEchoHandler: ChannelInboundHandler {
}
}

final class HTTP200DelayedHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

var pendingBodyParts: Int?

init(bodyPartsBeforeResponse: Int) {
self.pendingBodyParts = bodyPartsBeforeResponse
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let request = self.unwrapInboundIn(data)
switch request {
case .head:
break
case .body:
if let pendingBodyParts = self.pendingBodyParts {
if pendingBodyParts > 0 {
self.pendingBodyParts = pendingBodyParts - 1
} else {
self.pendingBodyParts = nil
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
}
}
case .end:
break
}
}
}

private let cert = """
-----BEGIN CERTIFICATE-----
MIICmDCCAYACCQCPC8JDqMh1zzANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJ1
Expand Down
1 change: 1 addition & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ extension HTTPClientTests {
("testSSLHandshakeErrorPropagationDelayedClose", testSSLHandshakeErrorPropagationDelayedClose),
("testWeCloseConnectionsWhenConnectionCloseSetByServer", testWeCloseConnectionsWhenConnectionCloseSetByServer),
("testBiDirectionalStreaming", testBiDirectionalStreaming),
("testBiDirectionalStreamingEarly200", testBiDirectionalStreamingEarly200),
("testSynchronousHandshakeErrorReporting", testSynchronousHandshakeErrorReporting),
("testFileDownloadChunked", testFileDownloadChunked),
("testCloseWhileBackpressureIsExertedIsFine", testCloseWhileBackpressureIsExertedIsFine),
Expand Down
54 changes: 54 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2940,6 +2940,60 @@ class HTTPClientTests: XCTestCase {
XCTAssertNil(try delegate.next().wait())
}

// In this test, we test that a request can continue to stream its body after the response head and end
// was received where the end is a 200.
func testBiDirectionalStreamingEarly200() {
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTP200DelayedHandler(bodyPartsBeforeResponse: 1) }
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let writeEL = eventLoopGroup.next()
let delegateEL = eventLoopGroup.next()

let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
defer { XCTAssertNoThrow(try httpClient.syncShutdown()) }

let delegate = ResponseStreamDelegate(eventLoop: delegateEL)

let body: HTTPClient.Body = .stream { writer in
let finalPromise = writeEL.makePromise(of: Void.self)

func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) {
// always invoke from the wrong el to test thread safety
writeEL.preconditionInEventLoop()

if index >= 30 {
return finalPromise.succeed(())
}

let sent = ByteBuffer(integer: index)
writer.write(.byteBuffer(sent)).whenComplete { result in
switch result {
case .success:
writeEL.execute {
writeLoop(writer, index: index + 1)
}

case .failure(let error):
finalPromise.fail(error)
}
}
}

writeEL.execute {
writeLoop(writer, index: 0)
}

return finalPromise.futureResult
}

let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body)
let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL))
XCTAssertNoThrow(try future.wait())
XCTAssertNil(try delegate.next().wait())
}

func testSynchronousHandshakeErrorReporting() throws {
// This only affects cases where we use NIOSSL.
guard !isTestingNIOTS() else { return }
Expand Down