Skip to content

URLSessionTask: implement InputStream #1629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions Foundation/URLSession/BodySource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,39 @@ internal enum _BodySourceDataChunk {
case error
}

internal final class _BodyStreamSource {
let inputStream: InputStream

init(inputStream: InputStream) {
self.inputStream = inputStream
}
}

extension _BodyStreamSource : _BodySource {
func getNextChunk(withLength length: Int) -> _BodySourceDataChunk {
guard inputStream.hasBytesAvailable else {
return .done
}


let buffer = UnsafeMutableRawBufferPointer.allocate(count: length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is it deallocated? 🤔

Is it possible to have length > 0 and readBytes <= 0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ogres thanks for you good catch! 👏

When is it deallocated? 🤔

It looks like a memory leak, and we need to dealloc buffer. We can use DispatchData(bytesNoCopy: UnsafeRawBufferPointer(buffer), deallocator: .free) - it will free a memory and also do not creates a copy of bytes.

Is it possible to have length > 0 and readBytes <= 0 ?

Yes, It can be, and unfortunately, we cannot be sure that data available during all time. Data and socket have different lifecycles so it’s ok that data is not available but the socket is alive and it wants a new chunk of data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ogres I'll try it ASAP, I stuck with building Foundation for OSX.

guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else {
return .error
}
let readBytes = self.inputStream.read(pointer, maxLength: length)
if readBytes > 0 {
let dispatchData = DispatchData(bytes: UnsafeRawBufferPointer(buffer))
return .data(dispatchData.subdata(in: 0 ..< readBytes))
}
else if readBytes == 0 {
return .done
}
else {
return .error
}
}
}

/// A body data source backed by `DispatchData`.
internal final class _BodyDataSource {
var data: DispatchData!
Expand Down
28 changes: 25 additions & 3 deletions Foundation/URLSession/NativeProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,28 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate {

func seekInputStream(to position: UInt64) throws {
// We will reset the body source and seek forward.
NSUnimplemented()
guard let session = task?.session as? URLSession else { fatalError() }
if let delegate = session.delegate as? URLSessionTaskDelegate {
delegate.urlSession(session, task: task!, needNewBodyStream: { [weak self] inputStream in
if let strongSelf = self, let url = strongSelf.request.url, let inputStream = inputStream {
switch strongSelf.internalState {
case .transferInProgress(let currentTransferState):
switch currentTransferState.requestBodySource {
case is _BodyStreamSource:
let drain = strongSelf.createTransferBodyDataDrain()
let source = _BodyStreamSource(inputStream: inputStream)
let transferState = _TransferState(url: url, bodyDataDrain: drain, bodySource: source)
strongSelf.internalState = .transferInProgress(transferState)
default:
NSUnimplemented()
}
default:
//TODO: it's possible?
break
}
}
})
}
}

func updateProgressMeter(with propgress: _EasyHandle._Progress) {
Expand Down Expand Up @@ -313,8 +334,9 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate {
self?.easyHandle.unpauseSend()
})
return _TransferState(url: url, bodyDataDrain: drain,bodySource: source)
case .stream:
NSUnimplemented()
case .stream(let inputStream):
let source = _BodyStreamSource(inputStream: inputStream)
return _TransferState(url: url, bodyDataDrain: drain, bodySource: source)
}
}

Expand Down
2 changes: 2 additions & 0 deletions Foundation/URLSession/URLSessionTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ open class URLSessionTask : NSObject, NSCopying {
internal convenience init(session: URLSession, request: URLRequest, taskIdentifier: Int) {
if let bodyData = request.httpBody {
self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.data(createDispatchData(bodyData)))
}else if let bodyStream = request.httpBodyStream {
self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.stream(bodyStream))
} else {
self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: .none)
}
Expand Down
32 changes: 31 additions & 1 deletion TestFoundation/HTTPServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,30 @@ class _HTTPServer {
}

public func request() throws -> _HTTPRequest {
return try _HTTPRequest(request: socket.readData())
var request = try _HTTPRequest(request: socket.readData())

if Int(request.getHeader(for: "Content-Length") ?? "0") ?? 0 > 0
|| (request.getHeader(for: "Transfer-Encoding") ?? "").lowercased() == "chunked" {

// According to RFC7230 https://tools.ietf.org/html/rfc7230#section-3
// We receive messageBody after the headers, so we need read from socket minimun 2 times
//
// HTTP-message structure
//
// start-line
// *( header-field CRLF )
// CRLF
// [ message-body ]
// We receives '{numofbytes}\r\n{data}\r\n'
// TODO read data until the end

let substr = try socket.readData().split(separator: "\r\n")
if substr.count >= 2 {
request.messageBody = String(substr[1])
}
}

return request
}

public func respond(with response: _HTTPResponse, startDelay: TimeInterval? = nil, sendDelay: TimeInterval? = nil, bodyChunks: Int? = nil) throws {
Expand Down Expand Up @@ -324,6 +347,7 @@ struct _HTTPRequest {
let method: Method
let uri: String
let body: String
var messageBody: String?
let headers: [String]

public init(request: String) {
Expand Down Expand Up @@ -526,12 +550,18 @@ public class TestURLSessionServer {
let httpResponse = _HTTPResponse(response: .REDIRECT, headers: "Location: \(value)", body: text)
return httpResponse
}

if uri == "/echo" {
return _HTTPResponse(response: .OK, body: request.messageBody ?? request.body)
}

if uri == "/redirect-with-default-port" {
let text = request.getCommaSeparatedHeaders()
let host = request.headers[1].components(separatedBy: " ")[1]
let ip = host.components(separatedBy: ":")[0]
let httpResponse = _HTTPResponse(response: .REDIRECT, headers: "Location: http://\(ip)/redirected-with-default-port", body: text)
return httpResponse

}
return _HTTPResponse(response: .OK, body: capitals[String(uri.dropFirst())]!)
}
Expand Down
51 changes: 51 additions & 0 deletions TestFoundation/TestURLSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TestURLSession : LoopbackServerTest {
("test_dataTaskWithURLRequest", test_dataTaskWithURLRequest),
("test_dataTaskWithURLCompletionHandler", test_dataTaskWithURLCompletionHandler),
("test_dataTaskWithURLRequestCompletionHandler", test_dataTaskWithURLRequestCompletionHandler),
("test_dataTaskWithHttpInputStream", test_dataTaskWithHttpInputStream),
("test_downloadTaskWithURL", test_downloadTaskWithURL),
("test_downloadTaskWithURLRequest", test_downloadTaskWithURLRequest),
("test_downloadTaskWithRequestAndHandler", test_downloadTaskWithRequestAndHandler),
Expand Down Expand Up @@ -122,6 +123,56 @@ class TestURLSession : LoopbackServerTest {
waitForExpectations(timeout: 12)
}

func test_dataTaskWithHttpInputStream() {
let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/echo"

let dataString = """
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex.

Suspendisse ante eros, scelerisque ut molestie vitae, lacinia nec metus. Sed in feugiat sem. Nullam sed congue nulla, id vehicula mauris. Aliquam ultrices ultricies pellentesque. Etiam blandit ultrices quam in egestas. Donec a vulputate est, ut ultricies dui. In non maximus velit.

Vivamus vehicula faucibus odio vel maximus. Vivamus elementum, quam at accumsan rhoncus, ex ligula maximus sem, sed pretium urna enim ut urna. Donec semper porta augue at faucibus. Quisque vel congue purus. Morbi vitae elit pellentesque, finibus lectus quis, laoreet nulla. Praesent in fermentum felis. Aenean vestibulum dictum lorem quis egestas. Sed dictum elementum est laoreet volutpat.
"""

let url = URL(string: urlString)!
let urlSession = URLSession(configuration: URLSessionConfiguration.default)

var urlRequest = URLRequest(url: url)
urlRequest.httpMethod = "POST"

guard let data = dataString.data(using: .utf8) else {
XCTFail()
return
}

let inputStream = InputStream(data: data)
inputStream.open()

urlRequest.httpBodyStream = inputStream

urlRequest.setValue("en-us", forHTTPHeaderField: "Accept-Language")
urlRequest.setValue("text/xml; charset=utf-8", forHTTPHeaderField: "Content-Type")
urlRequest.setValue("chunked", forHTTPHeaderField: "Transfer-Encoding")

let expect = expectation(description: "POST \(urlString): with HTTP Body as InputStream")
let task = urlSession.dataTask(with: urlRequest) { respData, response, error in
XCTAssertNotNil(respData)
XCTAssertNotNil(response)
XCTAssertNil(error)

defer { expect.fulfill() }
guard let httpResponse = response as? HTTPURLResponse else {
XCTFail("response (\(response.debugDescription)) invalid")
return
}

XCTAssertEqual(data, respData!, "Response Data and Data is not equal")
XCTAssertEqual(200, httpResponse.statusCode, "HTTP response code is not 200")
}
task.resume()
waitForExpectations(timeout: 12)
}

func test_downloadTaskWithURL() {
let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/country.txt"
let url = URL(string: urlString)!
Expand Down