Skip to content

Commit a9649e5

Browse files
authored
Merge pull request #1932 from albertaleksieiev/url-session-task/input-stream-swift-5
[5.0] URLSessionTask: implement InputStream.
2 parents 762edc3 + 4d7c7a4 commit a9649e5

File tree

7 files changed

+273
-6
lines changed

7 files changed

+273
-6
lines changed

Foundation/Stream.swift

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,10 @@ open class Stream: NSObject {
114114
// InputStream is an abstract class representing the base functionality of a read stream.
115115
// Subclassers are required to implement these methods.
116116
open class InputStream: Stream {
117-
117+
enum Error: Swift.Error {
118+
case cantSeekInputStream
119+
}
120+
118121
internal let _stream: CFReadStream!
119122

120123
// reads up to length bytes into the supplied buffer, which must be at least of size len. Returns the actual number of bytes read.
@@ -156,7 +159,7 @@ open class InputStream: Stream {
156159
return Stream.Status(rawValue: UInt(CFReadStreamGetStatus(_stream)))!
157160
}
158161

159-
open override var streamError: Error? {
162+
open override var streamError: Swift.Error? {
160163
return CFReadStreamCopyError(_stream)
161164
}
162165
}
@@ -225,6 +228,43 @@ open class OutputStream : Stream {
225228
}
226229
}
227230

231+
extension InputStream {
232+
func seek(to position: UInt64) throws {
233+
guard position > 0 else {
234+
return
235+
}
236+
237+
guard position < Int.max else { throw Error.cantSeekInputStream }
238+
239+
let bufferSize = 1024
240+
var remainingBytes = Int(position)
241+
242+
let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: bufferSize, alignment: MemoryLayout<UInt8>.alignment)
243+
244+
guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else {
245+
buffer.deallocate()
246+
throw Error.cantSeekInputStream
247+
}
248+
249+
if self.streamStatus == .notOpen {
250+
self.open()
251+
}
252+
253+
while remainingBytes > 0 && self.hasBytesAvailable {
254+
let read = self.read(pointer, maxLength: min(bufferSize, remainingBytes))
255+
if read == -1 {
256+
throw Error.cantSeekInputStream
257+
}
258+
remainingBytes -= read
259+
}
260+
261+
buffer.deallocate()
262+
if remainingBytes != 0 {
263+
throw Error.cantSeekInputStream
264+
}
265+
}
266+
}
267+
228268
// Discussion of this API is ongoing for its usage of AutoreleasingUnsafeMutablePointer
229269
#if false
230270
extension Stream {

Foundation/URLSession/BodySource.swift

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,43 @@ internal enum _BodySourceDataChunk {
5555
case error
5656
}
5757

58+
internal final class _BodyStreamSource {
59+
let inputStream: InputStream
60+
61+
init(inputStream: InputStream) {
62+
self.inputStream = inputStream
63+
}
64+
}
65+
66+
extension _BodyStreamSource : _BodySource {
67+
func getNextChunk(withLength length: Int) -> _BodySourceDataChunk {
68+
guard inputStream.hasBytesAvailable else {
69+
return .done
70+
}
71+
72+
let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: length, alignment: MemoryLayout<UInt8>.alignment)
73+
74+
guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else {
75+
buffer.deallocate()
76+
return .error
77+
}
78+
79+
let readBytes = self.inputStream.read(pointer, maxLength: length)
80+
if readBytes > 0 {
81+
let dispatchData = DispatchData(bytesNoCopy: UnsafeRawBufferPointer(buffer), deallocator: .custom(nil, { buffer.deallocate() }))
82+
return .data(dispatchData.subdata(in: 0 ..< readBytes))
83+
}
84+
else if readBytes == 0 {
85+
buffer.deallocate()
86+
return .done
87+
}
88+
else {
89+
buffer.deallocate()
90+
return .error
91+
}
92+
}
93+
}
94+
5895
/// A body data source backed by `DispatchData`.
5996
internal final class _BodyDataSource {
6097
var data: DispatchData!

Foundation/URLSession/NativeProtocol.swift

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,40 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate {
261261

262262
func seekInputStream(to position: UInt64) throws {
263263
// We will reset the body source and seek forward.
264-
NSUnimplemented()
264+
guard let session = task?.session as? URLSession else { fatalError() }
265+
266+
var currentInputStream: InputStream?
267+
268+
if let delegate = session.delegate as? URLSessionTaskDelegate {
269+
let dispatchGroup = DispatchGroup()
270+
dispatchGroup.enter()
271+
272+
delegate.urlSession(session, task: task!, needNewBodyStream: { inputStream in
273+
currentInputStream = inputStream
274+
dispatchGroup.leave()
275+
})
276+
277+
_ = dispatchGroup.wait(timeout: .now() + 7)
278+
}
279+
280+
if let url = self.request.url, let inputStream = currentInputStream {
281+
switch self.internalState {
282+
case .transferInProgress(let currentTransferState):
283+
switch currentTransferState.requestBodySource {
284+
case is _BodyStreamSource:
285+
try inputStream.seek(to: position)
286+
let drain = self.createTransferBodyDataDrain()
287+
let source = _BodyStreamSource(inputStream: inputStream)
288+
let transferState = _TransferState(url: url, bodyDataDrain: drain, bodySource: source)
289+
self.internalState = .transferInProgress(transferState)
290+
default:
291+
NSUnimplemented()
292+
}
293+
default:
294+
//TODO: it's possible?
295+
break
296+
}
297+
}
265298
}
266299

267300
func updateProgressMeter(with propgress: _EasyHandle._Progress) {
@@ -313,8 +346,9 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate {
313346
self?.easyHandle.unpauseSend()
314347
})
315348
return _TransferState(url: url, bodyDataDrain: drain,bodySource: source)
316-
case .stream:
317-
NSUnimplemented()
349+
case .stream(let inputStream):
350+
let source = _BodyStreamSource(inputStream: inputStream)
351+
return _TransferState(url: url, bodyDataDrain: drain, bodySource: source)
318352
}
319353
}
320354

Foundation/URLSession/URLSessionTask.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ open class URLSessionTask : NSObject, NSCopying {
5757
internal convenience init(session: URLSession, request: URLRequest, taskIdentifier: Int) {
5858
if let bodyData = request.httpBody {
5959
self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.data(createDispatchData(bodyData)))
60+
} else if let bodyStream = request.httpBodyStream {
61+
self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.stream(bodyStream))
6062
} else {
6163
self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: .none)
6264
}

TestFoundation/HTTPServer.swift

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,30 @@ class _HTTPServer {
210210
}
211211

212212
public func request() throws -> _HTTPRequest {
213-
return try _HTTPRequest(request: socket.readData())
213+
var request = try _HTTPRequest(request: socket.readData())
214+
215+
if Int(request.getHeader(for: "Content-Length") ?? "0") ?? 0 > 0
216+
|| (request.getHeader(for: "Transfer-Encoding") ?? "").lowercased() == "chunked" {
217+
218+
// According to RFC7230 https://tools.ietf.org/html/rfc7230#section-3
219+
// We receive messageBody after the headers, so we need read from socket minimun 2 times
220+
//
221+
// HTTP-message structure
222+
//
223+
// start-line
224+
// *( header-field CRLF )
225+
// CRLF
226+
// [ message-body ]
227+
// We receives '{numofbytes}\r\n{data}\r\n'
228+
// TODO read data until the end
229+
230+
let substr = try socket.readData().split(separator: "\r\n")
231+
if substr.count >= 2 {
232+
request.messageBody = String(substr[1])
233+
}
234+
}
235+
236+
return request
214237
}
215238

216239
public func respond(with response: _HTTPResponse, startDelay: TimeInterval? = nil, sendDelay: TimeInterval? = nil, bodyChunks: Int? = nil) throws {
@@ -324,6 +347,7 @@ struct _HTTPRequest {
324347
let method: Method
325348
let uri: String
326349
let body: String
350+
var messageBody: String?
327351
let headers: [String]
328352

329353
public init(request: String) {
@@ -533,13 +557,18 @@ public class TestURLSessionServer {
533557
let httpResponse = _HTTPResponse(response: .REDIRECT, headers: "Location: \(value)", body: text)
534558
return httpResponse
535559
}
560+
536561
if uri == "/redirect-with-default-port" {
537562
let text = request.getCommaSeparatedHeaders()
538563
let host = request.headers[1].components(separatedBy: " ")[1]
539564
let ip = host.components(separatedBy: ":")[0]
540565
let httpResponse = _HTTPResponse(response: .REDIRECT, headers: "Location: http://\(ip)/redirected-with-default-port", body: text)
541566
return httpResponse
542567
}
568+
if uri == "/echo" {
569+
return _HTTPResponse(response: .OK, body: request.messageBody ?? request.body)
570+
}
571+
543572
return _HTTPResponse(response: .OK, body: capitals[String(uri.dropFirst())]!)
544573
}
545574

TestFoundation/TestStream.swift

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,31 @@
77
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
88
//
99

10+
#if NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT
11+
#if (os(Linux) || os(Android))
12+
@testable import Foundation
13+
#else
14+
@testable import SwiftFoundation
15+
#endif
16+
#endif
17+
18+
private extension Data {
19+
init(reading input: InputStream) {
20+
self.init()
21+
input.open()
22+
23+
let bufferSize = 1024
24+
let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bufferSize)
25+
while input.hasBytesAvailable {
26+
let read = input.read(buffer, maxLength: bufferSize)
27+
self.append(buffer, count: read)
28+
}
29+
buffer.deallocate()
30+
31+
input.close()
32+
}
33+
}
34+
1035
class TestStream : XCTestCase {
1136
static var allTests: [(String, (TestStream) -> () throws -> Void)] {
1237
return [
@@ -15,6 +40,7 @@ class TestStream : XCTestCase {
1540
("test_InputStreamWithFile", test_InputStreamWithFile),
1641
("test_InputStreamHasBytesAvailable", test_InputStreamHasBytesAvailable),
1742
("test_InputStreamInvalidPath", test_InputStreamInvalidPath),
43+
("test_InputStreamSeekToPosition", test_InputStreamSeekToPosition),
1844
("test_outputStreamCreationToFile", test_outputStreamCreationToFile),
1945
("test_outputStreamCreationToBuffer", test_outputStreamCreationToBuffer),
2046
("test_outputStreamCreationWithUrl", test_outputStreamCreationWithUrl),
@@ -116,6 +142,54 @@ class TestStream : XCTestCase {
116142
XCTAssertEqual(.error, fileStream.streamStatus)
117143
}
118144

145+
func test_InputStreamSeekToPosition() {
146+
#if NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT
147+
let str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex.Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel."
148+
XCTAssert(str.count > 1024) // str.count must be bigger than buffersize inside InputStream.seek func.
149+
150+
func testSubdata(_ pos: UInt64) throws -> Data? {
151+
guard let data = str.data(using: .utf8) else {
152+
XCTFail()
153+
return nil
154+
}
155+
156+
let stream = InputStream(data: data)
157+
stream.open()
158+
159+
try stream.seek(to: pos)
160+
let streamData = Data(reading: stream)
161+
162+
let subdata = data[Int(pos)..<data.count]
163+
XCTAssertEqual(streamData, subdata)
164+
165+
return subdata
166+
}
167+
168+
var sum = 0
169+
for i in 0...str.count {
170+
do {
171+
sum += try testSubdata(UInt64(i))!.count
172+
} catch _ {
173+
XCTFail()
174+
}
175+
}
176+
177+
XCTAssertEqual(((1 + str.count) * str.count)/2, sum) // Test on sum of arithmetic sequence :)
178+
XCTAssertEqual(try testSubdata(UInt64(str.count))!.count, 0) // It shouldbe end
179+
180+
do {
181+
try testSubdata(UInt64(str.count + 1)) // out of boundaries
182+
XCTFail()
183+
} catch let error as InputStream.Error {
184+
XCTAssertEqual(error, .cantSeekInputStream)
185+
} catch {
186+
XCTFail()
187+
}
188+
#else
189+
print("NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT is not defined, skip it")
190+
#endif
191+
}
192+
119193
func test_outputStreamCreationToFile() {
120194
guard let filePath = createTestFile("TestFileOut.txt", _contents: Data(capacity: 256)) else {
121195
XCTFail("Unable to create temp file");

TestFoundation/TestURLSession.swift

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class TestURLSession : LoopbackServerTest {
1515
("test_dataTaskWithURLRequest", test_dataTaskWithURLRequest),
1616
("test_dataTaskWithURLCompletionHandler", test_dataTaskWithURLCompletionHandler),
1717
("test_dataTaskWithURLRequestCompletionHandler", test_dataTaskWithURLRequestCompletionHandler),
18+
("test_dataTaskWithHttpInputStream", test_dataTaskWithHttpInputStream),
1819
("test_downloadTaskWithURL", test_downloadTaskWithURL),
1920
("test_downloadTaskWithURLRequest", test_downloadTaskWithURLRequest),
2021
("test_downloadTaskWithRequestAndHandler", test_downloadTaskWithRequestAndHandler),
@@ -123,6 +124,56 @@ class TestURLSession : LoopbackServerTest {
123124
waitForExpectations(timeout: 12)
124125
}
125126

127+
func test_dataTaskWithHttpInputStream() {
128+
let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/echo"
129+
130+
let dataString = """
131+
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex.
132+
133+
Suspendisse ante eros, scelerisque ut molestie vitae, lacinia nec metus. Sed in feugiat sem. Nullam sed congue nulla, id vehicula mauris. Aliquam ultrices ultricies pellentesque. Etiam blandit ultrices quam in egestas. Donec a vulputate est, ut ultricies dui. In non maximus velit.
134+
135+
Vivamus vehicula faucibus odio vel maximus. Vivamus elementum, quam at accumsan rhoncus, ex ligula maximus sem, sed pretium urna enim ut urna. Donec semper porta augue at faucibus. Quisque vel congue purus. Morbi vitae elit pellentesque, finibus lectus quis, laoreet nulla. Praesent in fermentum felis. Aenean vestibulum dictum lorem quis egestas. Sed dictum elementum est laoreet volutpat.
136+
"""
137+
138+
let url = URL(string: urlString)!
139+
let urlSession = URLSession(configuration: URLSessionConfiguration.default)
140+
141+
var urlRequest = URLRequest(url: url)
142+
urlRequest.httpMethod = "POST"
143+
144+
guard let data = dataString.data(using: .utf8) else {
145+
XCTFail()
146+
return
147+
}
148+
149+
let inputStream = InputStream(data: data)
150+
inputStream.open()
151+
152+
urlRequest.httpBodyStream = inputStream
153+
154+
urlRequest.setValue("en-us", forHTTPHeaderField: "Accept-Language")
155+
urlRequest.setValue("text/xml; charset=utf-8", forHTTPHeaderField: "Content-Type")
156+
urlRequest.setValue("chunked", forHTTPHeaderField: "Transfer-Encoding")
157+
158+
let expect = expectation(description: "POST \(urlString): with HTTP Body as InputStream")
159+
let task = urlSession.dataTask(with: urlRequest) { respData, response, error in
160+
XCTAssertNotNil(respData)
161+
XCTAssertNotNil(response)
162+
XCTAssertNil(error)
163+
164+
defer { expect.fulfill() }
165+
guard let httpResponse = response as? HTTPURLResponse else {
166+
XCTFail("response (\(response.debugDescription)) invalid")
167+
return
168+
}
169+
170+
XCTAssertEqual(data, respData!, "Response Data and Data is not equal")
171+
XCTAssertEqual(200, httpResponse.statusCode, "HTTP response code is not 200")
172+
}
173+
task.resume()
174+
waitForExpectations(timeout: 12)
175+
}
176+
126177
func test_downloadTaskWithURL() {
127178
let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/country.txt"
128179
let url = URL(string: urlString)!

0 commit comments

Comments
 (0)