Skip to content

[5.0] URLSessionTask: implement InputStream. #1932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions Foundation/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ open class Stream: NSObject {
// InputStream is an abstract class representing the base functionality of a read stream.
// Subclassers are required to implement these methods.
open class InputStream: Stream {

enum Error: Swift.Error {
case cantSeekInputStream
}

internal let _stream: CFReadStream!

// reads up to length bytes into the supplied buffer, which must be at least of size len. Returns the actual number of bytes read.
Expand Down Expand Up @@ -156,7 +159,7 @@ open class InputStream: Stream {
return Stream.Status(rawValue: UInt(CFReadStreamGetStatus(_stream)))!
}

open override var streamError: Error? {
open override var streamError: Swift.Error? {
return CFReadStreamCopyError(_stream)
}
}
Expand Down Expand Up @@ -225,6 +228,43 @@ open class OutputStream : Stream {
}
}

extension InputStream {
func seek(to position: UInt64) throws {
guard position > 0 else {
return
}

guard position < Int.max else { throw Error.cantSeekInputStream }

let bufferSize = 1024
var remainingBytes = Int(position)

let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: bufferSize, alignment: MemoryLayout<UInt8>.alignment)

guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else {
buffer.deallocate()
throw Error.cantSeekInputStream
}

if self.streamStatus == .notOpen {
self.open()
}

while remainingBytes > 0 && self.hasBytesAvailable {
let read = self.read(pointer, maxLength: min(bufferSize, remainingBytes))
if read == -1 {
throw Error.cantSeekInputStream
}
remainingBytes -= read
}

buffer.deallocate()
if remainingBytes != 0 {
throw Error.cantSeekInputStream
}
}
}

// Discussion of this API is ongoing for its usage of AutoreleasingUnsafeMutablePointer
#if false
extension Stream {
Expand Down
37 changes: 37 additions & 0 deletions Foundation/URLSession/BodySource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,43 @@ internal enum _BodySourceDataChunk {
case error
}

internal final class _BodyStreamSource {
let inputStream: InputStream

init(inputStream: InputStream) {
self.inputStream = inputStream
}
}

extension _BodyStreamSource : _BodySource {
func getNextChunk(withLength length: Int) -> _BodySourceDataChunk {
guard inputStream.hasBytesAvailable else {
return .done
}

let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: length, alignment: MemoryLayout<UInt8>.alignment)

guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else {
buffer.deallocate()
return .error
}

let readBytes = self.inputStream.read(pointer, maxLength: length)
if readBytes > 0 {
let dispatchData = DispatchData(bytesNoCopy: UnsafeRawBufferPointer(buffer), deallocator: .custom(nil, { buffer.deallocate() }))
return .data(dispatchData.subdata(in: 0 ..< readBytes))
}
else if readBytes == 0 {
buffer.deallocate()
return .done
}
else {
buffer.deallocate()
return .error
}
}
}

/// A body data source backed by `DispatchData`.
internal final class _BodyDataSource {
var data: DispatchData!
Expand Down
40 changes: 37 additions & 3 deletions Foundation/URLSession/NativeProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,40 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate {

func seekInputStream(to position: UInt64) throws {
// We will reset the body source and seek forward.
NSUnimplemented()
guard let session = task?.session as? URLSession else { fatalError() }

var currentInputStream: InputStream?

if let delegate = session.delegate as? URLSessionTaskDelegate {
let dispatchGroup = DispatchGroup()
dispatchGroup.enter()

delegate.urlSession(session, task: task!, needNewBodyStream: { inputStream in
currentInputStream = inputStream
dispatchGroup.leave()
})

_ = dispatchGroup.wait(timeout: .now() + 7)
}

if let url = self.request.url, let inputStream = currentInputStream {
switch self.internalState {
case .transferInProgress(let currentTransferState):
switch currentTransferState.requestBodySource {
case is _BodyStreamSource:
try inputStream.seek(to: position)
let drain = self.createTransferBodyDataDrain()
let source = _BodyStreamSource(inputStream: inputStream)
let transferState = _TransferState(url: url, bodyDataDrain: drain, bodySource: source)
self.internalState = .transferInProgress(transferState)
default:
NSUnimplemented()
}
default:
//TODO: it's possible?
break
}
}
}

func updateProgressMeter(with propgress: _EasyHandle._Progress) {
Expand Down Expand Up @@ -313,8 +346,9 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate {
self?.easyHandle.unpauseSend()
})
return _TransferState(url: url, bodyDataDrain: drain,bodySource: source)
case .stream:
NSUnimplemented()
case .stream(let inputStream):
let source = _BodyStreamSource(inputStream: inputStream)
return _TransferState(url: url, bodyDataDrain: drain, bodySource: source)
}
}

Expand Down
2 changes: 2 additions & 0 deletions Foundation/URLSession/URLSessionTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ open class URLSessionTask : NSObject, NSCopying {
internal convenience init(session: URLSession, request: URLRequest, taskIdentifier: Int) {
if let bodyData = request.httpBody {
self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.data(createDispatchData(bodyData)))
} else if let bodyStream = request.httpBodyStream {
self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.stream(bodyStream))
} else {
self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: .none)
}
Expand Down
31 changes: 30 additions & 1 deletion TestFoundation/HTTPServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,30 @@ class _HTTPServer {
}

public func request() throws -> _HTTPRequest {
return try _HTTPRequest(request: socket.readData())
var request = try _HTTPRequest(request: socket.readData())

if Int(request.getHeader(for: "Content-Length") ?? "0") ?? 0 > 0
|| (request.getHeader(for: "Transfer-Encoding") ?? "").lowercased() == "chunked" {

// According to RFC7230 https://tools.ietf.org/html/rfc7230#section-3
// We receive messageBody after the headers, so we need read from socket minimun 2 times
//
// HTTP-message structure
//
// start-line
// *( header-field CRLF )
// CRLF
// [ message-body ]
// We receives '{numofbytes}\r\n{data}\r\n'
// TODO read data until the end

let substr = try socket.readData().split(separator: "\r\n")
if substr.count >= 2 {
request.messageBody = String(substr[1])
}
}

return request
}

public func respond(with response: _HTTPResponse, startDelay: TimeInterval? = nil, sendDelay: TimeInterval? = nil, bodyChunks: Int? = nil) throws {
Expand Down Expand Up @@ -324,6 +347,7 @@ struct _HTTPRequest {
let method: Method
let uri: String
let body: String
var messageBody: String?
let headers: [String]

public init(request: String) {
Expand Down Expand Up @@ -533,13 +557,18 @@ public class TestURLSessionServer {
let httpResponse = _HTTPResponse(response: .REDIRECT, headers: "Location: \(value)", body: text)
return httpResponse
}

if uri == "/redirect-with-default-port" {
let text = request.getCommaSeparatedHeaders()
let host = request.headers[1].components(separatedBy: " ")[1]
let ip = host.components(separatedBy: ":")[0]
let httpResponse = _HTTPResponse(response: .REDIRECT, headers: "Location: http://\(ip)/redirected-with-default-port", body: text)
return httpResponse
}
if uri == "/echo" {
return _HTTPResponse(response: .OK, body: request.messageBody ?? request.body)
}

return _HTTPResponse(response: .OK, body: capitals[String(uri.dropFirst())]!)
}

Expand Down
74 changes: 74 additions & 0 deletions TestFoundation/TestStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,31 @@
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//

#if NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT
#if (os(Linux) || os(Android))
@testable import Foundation
#else
@testable import SwiftFoundation
#endif
#endif

private extension Data {
init(reading input: InputStream) {
self.init()
input.open()

let bufferSize = 1024
let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bufferSize)
while input.hasBytesAvailable {
let read = input.read(buffer, maxLength: bufferSize)
self.append(buffer, count: read)
}
buffer.deallocate()

input.close()
}
}

class TestStream : XCTestCase {
static var allTests: [(String, (TestStream) -> () throws -> Void)] {
return [
Expand All @@ -15,6 +40,7 @@ class TestStream : XCTestCase {
("test_InputStreamWithFile", test_InputStreamWithFile),
("test_InputStreamHasBytesAvailable", test_InputStreamHasBytesAvailable),
("test_InputStreamInvalidPath", test_InputStreamInvalidPath),
("test_InputStreamSeekToPosition", test_InputStreamSeekToPosition),
("test_outputStreamCreationToFile", test_outputStreamCreationToFile),
("test_outputStreamCreationToBuffer", test_outputStreamCreationToBuffer),
("test_outputStreamCreationWithUrl", test_outputStreamCreationWithUrl),
Expand Down Expand Up @@ -116,6 +142,54 @@ class TestStream : XCTestCase {
XCTAssertEqual(.error, fileStream.streamStatus)
}

func test_InputStreamSeekToPosition() {
#if NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT
let str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex.Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel."
XCTAssert(str.count > 1024) // str.count must be bigger than buffersize inside InputStream.seek func.

func testSubdata(_ pos: UInt64) throws -> Data? {
guard let data = str.data(using: .utf8) else {
XCTFail()
return nil
}

let stream = InputStream(data: data)
stream.open()

try stream.seek(to: pos)
let streamData = Data(reading: stream)

let subdata = data[Int(pos)..<data.count]
XCTAssertEqual(streamData, subdata)

return subdata
}

var sum = 0
for i in 0...str.count {
do {
sum += try testSubdata(UInt64(i))!.count
} catch _ {
XCTFail()
}
}

XCTAssertEqual(((1 + str.count) * str.count)/2, sum) // Test on sum of arithmetic sequence :)
XCTAssertEqual(try testSubdata(UInt64(str.count))!.count, 0) // It shouldbe end

do {
try testSubdata(UInt64(str.count + 1)) // out of boundaries
XCTFail()
} catch let error as InputStream.Error {
XCTAssertEqual(error, .cantSeekInputStream)
} catch {
XCTFail()
}
#else
print("NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT is not defined, skip it")
#endif
}

func test_outputStreamCreationToFile() {
guard let filePath = createTestFile("TestFileOut.txt", _contents: Data(capacity: 256)) else {
XCTFail("Unable to create temp file");
Expand Down
51 changes: 51 additions & 0 deletions TestFoundation/TestURLSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TestURLSession : LoopbackServerTest {
("test_dataTaskWithURLRequest", test_dataTaskWithURLRequest),
("test_dataTaskWithURLCompletionHandler", test_dataTaskWithURLCompletionHandler),
("test_dataTaskWithURLRequestCompletionHandler", test_dataTaskWithURLRequestCompletionHandler),
("test_dataTaskWithHttpInputStream", test_dataTaskWithHttpInputStream),
("test_downloadTaskWithURL", test_downloadTaskWithURL),
("test_downloadTaskWithURLRequest", test_downloadTaskWithURLRequest),
("test_downloadTaskWithRequestAndHandler", test_downloadTaskWithRequestAndHandler),
Expand Down Expand Up @@ -123,6 +124,56 @@ class TestURLSession : LoopbackServerTest {
waitForExpectations(timeout: 12)
}

func test_dataTaskWithHttpInputStream() {
let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/echo"

let dataString = """
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex.

Suspendisse ante eros, scelerisque ut molestie vitae, lacinia nec metus. Sed in feugiat sem. Nullam sed congue nulla, id vehicula mauris. Aliquam ultrices ultricies pellentesque. Etiam blandit ultrices quam in egestas. Donec a vulputate est, ut ultricies dui. In non maximus velit.

Vivamus vehicula faucibus odio vel maximus. Vivamus elementum, quam at accumsan rhoncus, ex ligula maximus sem, sed pretium urna enim ut urna. Donec semper porta augue at faucibus. Quisque vel congue purus. Morbi vitae elit pellentesque, finibus lectus quis, laoreet nulla. Praesent in fermentum felis. Aenean vestibulum dictum lorem quis egestas. Sed dictum elementum est laoreet volutpat.
"""

let url = URL(string: urlString)!
let urlSession = URLSession(configuration: URLSessionConfiguration.default)

var urlRequest = URLRequest(url: url)
urlRequest.httpMethod = "POST"

guard let data = dataString.data(using: .utf8) else {
XCTFail()
return
}

let inputStream = InputStream(data: data)
inputStream.open()

urlRequest.httpBodyStream = inputStream

urlRequest.setValue("en-us", forHTTPHeaderField: "Accept-Language")
urlRequest.setValue("text/xml; charset=utf-8", forHTTPHeaderField: "Content-Type")
urlRequest.setValue("chunked", forHTTPHeaderField: "Transfer-Encoding")

let expect = expectation(description: "POST \(urlString): with HTTP Body as InputStream")
let task = urlSession.dataTask(with: urlRequest) { respData, response, error in
XCTAssertNotNil(respData)
XCTAssertNotNil(response)
XCTAssertNil(error)

defer { expect.fulfill() }
guard let httpResponse = response as? HTTPURLResponse else {
XCTFail("response (\(response.debugDescription)) invalid")
return
}

XCTAssertEqual(data, respData!, "Response Data and Data is not equal")
XCTAssertEqual(200, httpResponse.statusCode, "HTTP response code is not 200")
}
task.resume()
waitForExpectations(timeout: 12)
}

func test_downloadTaskWithURL() {
let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/country.txt"
let url = URL(string: urlString)!
Expand Down