@@ -41,14 +41,14 @@ public enum Algorithm: CaseIterable {
41
41
42
42
/// Compression errors
43
43
public enum FilterError : Error {
44
- /// Filter failed to initialize
45
- case filterInitError
46
-
47
- /// Invalid data in a call to compression_stream_process
48
- case filterProcessError
49
-
50
- /// Non -empty write after an output filter has been finalized
51
- case writeToFinalizedFilter
44
+ /// Filter failed to initialize,
45
+ /// invalid internal state,
46
+ /// invalid parameters
47
+ case state
48
+
49
+ /// Invalid data in a call to compression_stream_process,
50
+ /// non -empty write after an output filter has been finalized
51
+ case data
52
52
}
53
53
54
54
/// Compression filter direction of operation, compress/decompress
@@ -67,29 +67,29 @@ public enum FilterOperation {
67
67
}
68
68
}
69
69
70
- @available ( macOS 10 . 12 , iOS 10 . 0 , watchOS 3 . 0 , tvOS 10 . 0 , * )
70
+ @available ( macOS 9999 , iOS 9999 , watchOS 9999 , tvOS 9999 , * )
71
71
extension compression_stream {
72
72
73
73
/// Initialize a compression_stream struct
74
74
///
75
75
/// - Parameter operation: direction of operation
76
76
/// - Parameter algorithm: compression algorithm
77
77
///
78
- /// - Throws: `FilterError.filterInitError ` if `algorithm` is not supported
78
+ /// - Throws: `FilterError.state ` if `algorithm` is not supported
79
79
/// by the Compression stream API
80
80
///
81
81
internal init ( operation: FilterOperation , algorithm: Algorithm ) throws {
82
- self . init ( dst_ptr: UnsafeMutablePointer< UInt8> . allocate ( capacity : 0 ) ,
82
+ self . init ( dst_ptr: UnsafeMutablePointer < UInt8 > ( bitPattern : - 1 ) ! ,
83
83
dst_size: 0 ,
84
- src_ptr: UnsafeMutablePointer< UInt8> . allocate ( capacity : 0 ) ,
84
+ src_ptr: UnsafeMutablePointer < UInt8 > ( bitPattern : - 1 ) ! ,
85
85
src_size: 0 ,
86
86
state: nil )
87
87
let status = compression_stream_init ( & self , operation. rawValue, algorithm. rawValue)
88
- guard status == COMPRESSION_STATUS_OK else { throw FilterError . filterInitError }
88
+ guard status == COMPRESSION_STATUS_OK else { throw FilterError . state }
89
89
}
90
90
}
91
91
92
- @available ( macOS 10 . 12 , iOS 10 . 0 , watchOS 3 . 0 , tvOS 10 . 0 , * )
92
+ @available ( macOS 9999 , iOS 9999 , watchOS 9999 , tvOS 9999 , * )
93
93
public class OutputFilter {
94
94
private var _stream : compression_stream
95
95
private var _buf : UnsafeMutablePointer < UInt8 >
@@ -105,7 +105,7 @@ public class OutputFilter {
105
105
/// - bufferCapacity: capacity of the internal data buffer
106
106
/// - writeFunc: called to write the processed data
107
107
///
108
- /// - Throws: `FilterError.StreamInitError ` if stream initialization failed
108
+ /// - Throws: `FilterError.state ` if stream initialization failed
109
109
public init (
110
110
_ operation: FilterOperation ,
111
111
using algorithm: Algorithm ,
@@ -127,21 +127,22 @@ public class OutputFilter {
127
127
/// - Parameter data: data to process
128
128
///
129
129
/// - Throws:
130
- /// `FilterError.filterProcessError` if an error occurs during processing
131
- /// `FilterError.writeToFinalizedFilter` if `data` is not empty/nil, and the
132
- /// filter is the finalized state
133
- public func write( _ data: Data ? ) throws {
130
+ /// `FilterError.data` if an error occurs during processing,
131
+ /// or if `data` is not empty/nil, and the filter is the finalized state
132
+ public func write< D : DataProtocol > ( _ data: D ? ) throws {
134
133
// Finalize if data is empty/nil
135
134
if data == nil || data!. isEmpty { try finalize ( ) ; return }
136
135
137
136
// Fail if already finalized
138
- if _finalized { throw FilterError . writeToFinalizedFilter }
137
+ if _finalized { throw FilterError . data }
139
138
140
139
// Process all incoming data
141
- try data!. withUnsafeBytes { ( src_ptr: UnsafePointer < UInt8 > ) in
142
- _stream. src_size = data!. count
143
- _stream. src_ptr = src_ptr
144
- while ( _stream. src_size > 0 ) { _ = try process ( finalizing: false ) }
140
+ for region in data!. regions {
141
+ try region. withUnsafeBytes { ( raw_src_ptr: UnsafeRawBufferPointer ) in
142
+ _stream. src_size = region. count
143
+ _stream. src_ptr = raw_src_ptr. baseAddress!. assumingMemoryBound ( to: UInt8 . self)
144
+ while ( _stream. src_size > 0 ) { _ = try process ( finalizing: false ) }
145
+ }
145
146
}
146
147
}
147
148
@@ -151,7 +152,7 @@ public class OutputFilter {
151
152
/// When all output has been sent, the writingTo closure is called one last time with nil data.
152
153
/// Once the stream is finalized, writing non empty/nil data to the stream will throw an exception.
153
154
///
154
- /// - Throws: `FilterError.StreamProcessError ` if an error occurs during processing
155
+ /// - Throws: `FilterError.data ` if an error occurs during processing
155
156
public func finalize( ) throws {
156
157
// Do nothing if already finalized
157
158
if _finalized { return }
@@ -186,7 +187,7 @@ public class OutputFilter {
186
187
_stream. dst_size = _bufCapacity
187
188
188
189
let status = compression_stream_process ( & _stream, ( finalize ? Int32 ( COMPRESSION_STREAM_FINALIZE . rawValue) : 0 ) )
189
- guard status != COMPRESSION_STATUS_ERROR else { throw FilterError . filterProcessError }
190
+ guard status != COMPRESSION_STATUS_ERROR else { throw FilterError . data }
190
191
191
192
// Number of bytes written to buf
192
193
let writtenBytes = _bufCapacity - _stream. dst_size
@@ -202,12 +203,71 @@ public class OutputFilter {
202
203
203
204
}
204
205
205
- @available ( macOS 10 . 12 , iOS 10 . 0 , watchOS 3 . 0 , tvOS 10 . 0 , * )
206
- public class InputFilter {
206
+ @available ( macOS 9999 , iOS 9999 , watchOS 9999 , tvOS 9999 , * )
207
+ public class InputFilter < D: DataProtocol > {
208
+
209
+ // Internal buffer to read bytes from a DataProtocol implementation
210
+ private class InputFilterBuffer < D: DataProtocol > {
211
+ private var _data : D // current input data
212
+ private var _remaining : Int // total bytes remaining to process in _data
213
+ private var _regionIndex : D . Regions . Index // region being read in _data
214
+ private var _regionRemaining : Int // remaining bytes to read in region being read in _data
215
+ public init ( _ data: D ) throws {
216
+ _data = data
217
+ _remaining = _data. count
218
+ _regionRemaining = 0
219
+ _regionIndex = _data. regions. startIndex
220
+ if _regionIndex != _data. regions. endIndex { _regionRemaining = _data. regions [ _regionIndex] . count }
221
+ // Advance to first non-zero region
222
+ try advance ( by: 0 )
223
+ }
224
+
225
+ // Return number of remaining bytes
226
+ public func remaining( ) -> Int { return _remaining }
227
+
228
+ // Call f with a buffer to the remaining bytes of the current contiguous region
229
+ public func withUnsafeBytes< R> ( _ body: ( UnsafeRawBufferPointer ) throws -> R ) rethrows -> R ? {
230
+ if _remaining == 0 {
231
+ return try body ( UnsafeRawBufferPointer ( start: nil , count: 0 ) )
232
+ } else {
233
+ let r = _data. regions [ _regionIndex]
234
+ return try r. withUnsafeBytes { ( region_buf: UnsafeRawBufferPointer ) in
235
+ let src_buf = UnsafeRawBufferPointer (
236
+ start: region_buf. baseAddress!. assumingMemoryBound ( to: UInt8 . self) + region_buf. count - _regionRemaining,
237
+ count: _regionRemaining)
238
+ return try body ( src_buf)
239
+ }
240
+ }
241
+ }
242
+
243
+ // Consume n bytes in the current region (n can be 0, up to _regionRemaining), and move to next non-empty region if needed
244
+ // post-condition: _remaining == 0 || _regionRemaining > 0
245
+ public func advance( by n: Int ) throws {
246
+
247
+ // Sanity checks
248
+ if n > _regionRemaining { throw FilterError . state } // invalid n
249
+
250
+ // Update counters
251
+ _regionRemaining -= n
252
+ _remaining -= n
253
+
254
+ // Move to next non-empty region if we are done with the current one
255
+ let r = _data. regions
256
+ while _regionRemaining == 0 {
257
+ _regionIndex = r. index ( after: _regionIndex)
258
+ if _regionIndex == r. endIndex { break }
259
+ _regionRemaining = r [ _regionIndex] . count
260
+ }
261
+
262
+ // Sanity checks
263
+ if _remaining != 0 && _regionRemaining == 0 { throw FilterError . state }
264
+ }
265
+ }
266
+
267
+ private let _readCapacity : Int // size to use when calling _readFunc
268
+ private let _readFunc : ( Int ) throws -> D ? // caller-provided read function
207
269
private var _stream : compression_stream
208
- private var _buf : Data ? = nil // current input data
209
- private let _bufCapacity : Int // size to read when refilling _buf
210
- private let _readFunc : ( Int ) throws -> Data ?
270
+ private var _buf : InputFilterBuffer < D > ? = nil // input
211
271
private var _eofReached : Bool = false // did we read end-of-file from the input?
212
272
private var _endReached : Bool = false // did we reach end-of-file from the decoder stream?
213
273
@@ -219,15 +279,15 @@ public class InputFilter {
219
279
/// - bufferCapacity: capacity of the internal data buffer
220
280
/// - readFunc: called to read the input data
221
281
///
222
- /// - Throws: `FilterError.filterInitError ` if filter initialization failed
282
+ /// - Throws: `FilterError.state ` if filter initialization failed
223
283
public init (
224
284
_ operation: FilterOperation ,
225
285
using algorithm: Algorithm ,
226
286
bufferCapacity: Int = 65536 ,
227
- readingFrom readFunc: @escaping ( Int ) throws -> Data ?
287
+ readingFrom readFunc: @escaping ( Int ) throws -> D ?
228
288
) throws {
229
289
_stream = try compression_stream ( operation: operation, algorithm: algorithm)
230
- _bufCapacity = bufferCapacity
290
+ _readCapacity = bufferCapacity
231
291
_readFunc = readFunc
232
292
}
233
293
@@ -243,7 +303,7 @@ public class InputFilter {
243
303
/// - Returns: a new Data object containing at most `count` output bytes, or nil if no more data is available
244
304
///
245
305
/// - Throws:
246
- /// `FilterError.filterProcessError ` if an error occurs during processing
306
+ /// `FilterError.data ` if an error occurs during processing
247
307
public func readData( ofLength count: Int ) throws -> Data ? {
248
308
// Sanity check
249
309
precondition ( count > 0 , " number of bytes to read can't be 0 " )
@@ -263,32 +323,37 @@ public class InputFilter {
263
323
while _stream. dst_size > 0 && !_endReached {
264
324
265
325
// Refill _buf if needed, and EOF was not yet read
266
- if _stream. src_size == 0 && !_eofReached {
267
- _buf = try _readFunc ( _bufCapacity) // may be nil
268
- // Reset src_size to full _buf size
269
- if _buf? . count ?? 0 == 0 { _eofReached = true }
270
- _stream. src_size = _buf? . count ?? 0
326
+ if ( _buf? . remaining ( ) ?? 0 ) == 0 && !_eofReached {
327
+ let data = try _readFunc ( _readCapacity) // may be nil, or empty
328
+ if data? . count ?? 0 == 0 { // nil or empty -> EOF
329
+ _eofReached = true
330
+ _buf = nil
331
+ } else {
332
+ _buf = try InputFilterBuffer < D > ( data!)
333
+ }
271
334
}
272
335
273
336
// Process some data
274
337
if let buf = _buf {
275
- try buf. withUnsafeBytes { ( src_ptr: UnsafePointer < UInt8 > ) in
276
-
277
- // Next byte to read
278
- _stream. src_ptr = src_ptr + buf. count - _stream. src_size
279
-
338
+ try buf. withUnsafeBytes { ( src_buf: UnsafeRawBufferPointer ) in
339
+ // Point to buffer
340
+ _stream. src_ptr = src_buf. baseAddress!. assumingMemoryBound ( to: UInt8 . self)
341
+ _stream. src_size = src_buf. count
280
342
let status = compression_stream_process ( & _stream, ( _eofReached ? Int32 ( COMPRESSION_STREAM_FINALIZE . rawValue) : 0 ) )
281
- guard status != COMPRESSION_STATUS_ERROR else { throw FilterError . filterProcessError }
343
+ guard status != COMPRESSION_STATUS_ERROR else { throw FilterError . data }
282
344
if status == COMPRESSION_STATUS_END { _endReached = true }
345
+ // Advance by the number of consumed bytes
346
+ let consumed = src_buf. count - _stream. src_size
347
+ try buf. advance ( by: consumed)
283
348
}
284
- }
285
- else {
349
+ } else {
350
+ // No data available, process until END reached
286
351
let status = compression_stream_process ( & _stream, ( _eofReached ? Int32 ( COMPRESSION_STREAM_FINALIZE . rawValue) : 0 ) )
287
- guard status != COMPRESSION_STATUS_ERROR else { throw FilterError . filterProcessError }
352
+ guard status != COMPRESSION_STATUS_ERROR else { throw FilterError . data }
288
353
if status == COMPRESSION_STATUS_END { _endReached = true }
289
-
290
354
}
291
- }
355
+
356
+ } // _stream.dst_size > 0 && !_endReached
292
357
293
358
} // result.withUnsafeMutableBytes
294
359
0 commit comments