Skip to content

[Redirect] Allow redirect response to have body #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 65 additions & 14 deletions Sources/AsyncHTTPClient/RequestBag+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,24 @@ import struct Foundation.URL
import NIOCore
import NIOHTTP1

extension HTTPClient {
/// The maximum body size allowed, before a redirect response is cancelled. 3KB.
///
/// Why 3KB? We feel like this is a good compromise between potentially reusing the
/// connection in HTTP/1.1 mode (if we load all data from the redirect response we can
/// reuse the connection) and not being to wasteful in the amount of data that is thrown
/// away being transferred.
fileprivate static let maxBodySizeRedirectResponse = 1024 * 3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here to explain why it is 3kb?

}

extension RequestBag {
struct StateMachine {
fileprivate enum State {
case initialized
case queued(HTTPRequestScheduler)
case executing(HTTPRequestExecutor, RequestStreamState, ResponseStreamState)
case finished(error: Error?)
case redirected(HTTPResponseHead, URL)
case redirected(HTTPRequestExecutor, Int, HTTPResponseHead, URL)
case modifying
}

Expand Down Expand Up @@ -259,11 +269,18 @@ extension RequestBag.StateMachine {
}
}

enum ReceiveResponseHeadAction {
case none
case forwardResponseHead(HTTPResponseHead)
case signalBodyDemand(HTTPRequestExecutor)
case redirect(HTTPRequestExecutor, RedirectHandler<Delegate.Response>, HTTPResponseHead, URL)
}

/// The response head has been received.
///
/// - Parameter head: The response' head
/// - Returns: Whether the response should be forwarded to the delegate. Will be `false` if the request follows a redirect.
mutating func receiveResponseHead(_ head: HTTPResponseHead) -> Bool {
mutating func receiveResponseHead(_ head: HTTPResponseHead) -> ReceiveResponseHeadAction {
switch self.state {
case .initialized, .queued:
preconditionFailure("How can we receive a response, if the request hasn't started yet.")
Expand All @@ -276,24 +293,40 @@ extension RequestBag.StateMachine {
status: head.status,
responseHeaders: head.headers
) {
self.state = .redirected(head, redirectURL)
return false
// If we will redirect, we need to consume the response's body ASAP, to be able to
// reuse the existing connection. We will consume a response body, if the body is
// smaller than 3kb.
switch head.contentLength {
case .some(0...(HTTPClient.maxBodySizeRedirectResponse)), .none:
self.state = .redirected(executor, 0, head, redirectURL)
return .signalBodyDemand(executor)
case .some:
self.state = .finished(error: HTTPClientError.cancelled)
return .redirect(executor, self.redirectHandler!, head, redirectURL)
}
} else {
self.state = .executing(executor, requestState, .buffering(.init(), next: .askExecutorForMore))
return true
return .forwardResponseHead(head)
}
case .redirected:
preconditionFailure("This state can only be reached after we have received a HTTP head")
case .finished(error: .some):
return false
return .none
case .finished(error: .none):
preconditionFailure("How can the request be finished without error, before receiving response head?")
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}

mutating func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>) -> ByteBuffer? {
enum ReceiveResponseBodyAction {
case none
case forwardResponsePart(ByteBuffer)
case signalBodyDemand(HTTPRequestExecutor)
case redirect(HTTPRequestExecutor, RedirectHandler<Delegate.Response>, HTTPResponseHead, URL)
}

mutating func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>) -> ReceiveResponseBodyAction {
switch self.state {
case .initialized, .queued:
preconditionFailure("How can we receive a response body part, if the request hasn't started yet.")
Expand All @@ -312,17 +345,26 @@ extension RequestBag.StateMachine {
currentBuffer.append(contentsOf: buffer)
}
self.state = .executing(executor, requestState, .buffering(currentBuffer, next: next))
return nil
return .none
case .executing(let executor, let requestState, .waitingForRemote):
var buffer = buffer
let first = buffer.removeFirst()
self.state = .executing(executor, requestState, .buffering(buffer, next: .askExecutorForMore))
return first
case .redirected:
// ignore body
return nil
return .forwardResponsePart(first)
case .redirected(let executor, var receivedBytes, let head, let redirectURL):
let partsLength = buffer.reduce(into: 0) { $0 += $1.readableBytes }
receivedBytes += partsLength

if receivedBytes > HTTPClient.maxBodySizeRedirectResponse {
self.state = .finished(error: HTTPClientError.cancelled)
return .redirect(executor, self.redirectHandler!, head, redirectURL)
} else {
self.state = .redirected(executor, receivedBytes, head, redirectURL)
return .signalBodyDemand(executor)
}

case .finished(error: .some):
return nil
return .none
case .finished(error: .none):
preconditionFailure("How can the request be finished without error, before receiving response head?")
case .modifying:
Expand Down Expand Up @@ -368,7 +410,7 @@ extension RequestBag.StateMachine {
self.state = .executing(executor, requestState, .buffering(newChunks, next: .eof))
return .consume(first)

case .redirected(let head, let redirectURL):
case .redirected(_, _, let head, let redirectURL):
self.state = .finished(error: nil)
return .redirect(self.redirectHandler!, head, redirectURL)

Expand Down Expand Up @@ -529,3 +571,12 @@ extension RequestBag.StateMachine {
}
}
}

extension HTTPResponseHead {
var contentLength: Int? {
guard let header = self.headers.first(name: "content-length") else {
return nil
}
return Int(header)
}
}
52 changes: 34 additions & 18 deletions Sources/AsyncHTTPClient/RequestBag.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,33 +196,49 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
self.task.eventLoop.assertInEventLoop()

// runs most likely on channel eventLoop
let forwardToDelegate = self.state.receiveResponseHead(head)
switch self.state.receiveResponseHead(head) {
case .none:
break

guard forwardToDelegate else { return }
case .signalBodyDemand(let executor):
executor.demandResponseBodyStream(self)

self.delegate.didReceiveHead(task: self.task, head)
.hop(to: self.task.eventLoop)
.whenComplete { result in
// After the head received, let's start to consume body data
self.consumeMoreBodyData0(resultOfPreviousConsume: result)
}
case .redirect(let executor, let handler, let head, let newURL):
handler.redirect(status: head.status, to: newURL, promise: self.task.promise)
executor.cancelRequest(self)

case .forwardResponseHead(let head):
self.delegate.didReceiveHead(task: self.task, head)
.hop(to: self.task.eventLoop)
.whenComplete { result in
// After the head received, let's start to consume body data
self.consumeMoreBodyData0(resultOfPreviousConsume: result)
}
}
}

private func receiveResponseBodyParts0(_ buffer: CircularBuffer<ByteBuffer>) {
self.task.eventLoop.assertInEventLoop()

let maybeForwardBuffer = self.state.receiveResponseBodyParts(buffer)
switch self.state.receiveResponseBodyParts(buffer) {
case .none:
break

guard let forwardBuffer = maybeForwardBuffer else {
return
}
case .signalBodyDemand(let executor):
executor.demandResponseBodyStream(self)

self.delegate.didReceiveBodyPart(task: self.task, forwardBuffer)
.hop(to: self.task.eventLoop)
.whenComplete { result in
// on task el
self.consumeMoreBodyData0(resultOfPreviousConsume: result)
}
case .redirect(let executor, let handler, let head, let newURL):
handler.redirect(status: head.status, to: newURL, promise: self.task.promise)
executor.cancelRequest(self)

case .forwardResponsePart(let part):
self.delegate.didReceiveBodyPart(task: self.task, part)
.hop(to: self.task.eventLoop)
.whenComplete { result in
// on task el
self.consumeMoreBodyData0(resultOfPreviousConsume: result)
}
}
}

private func succeedRequest0(_ buffer: CircularBuffer<ByteBuffer>?) {
Expand Down
3 changes: 3 additions & 0 deletions Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ extension RequestBagTests {
("testChannelBecomingWritableDoesntCrashCancelledTask", testChannelBecomingWritableDoesntCrashCancelledTask),
("testHTTPUploadIsCancelledEvenThoughRequestSucceeds", testHTTPUploadIsCancelledEvenThoughRequestSucceeds),
("testRaceBetweenConnectionCloseAndDemandMoreData", testRaceBetweenConnectionCloseAndDemandMoreData),
("testRedirectWith3KBBody", testRedirectWith3KBBody),
("testRedirectWith4KBBodyAnnouncedInResponseHead", testRedirectWith4KBBodyAnnouncedInResponseHead),
("testRedirectWith4KBBodyNotAnnouncedInResponseHead", testRedirectWith4KBBodyNotAnnouncedInResponseHead),
]
}
}
Loading