Skip to content

[5.1] DataProtocol inputs in Compression overlay #24076

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 150 additions & 61 deletions stdlib/public/Darwin/Compression/Compression.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import Foundation
@_exported import Compression

/// Compression algorithms, wraps the C API constants.
public enum Algorithm: CaseIterable {
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
public enum Algorithm: CaseIterable, RawRepresentable {

/// LZFSE
case lzfse
Expand All @@ -29,6 +30,16 @@ public enum Algorithm: CaseIterable {
/// LZMA in a XZ container
case lzma

public init?(rawValue: compression_algorithm) {
switch rawValue {
case COMPRESSION_LZFSE: self = .lzfse
case COMPRESSION_ZLIB: self = .zlib
case COMPRESSION_LZ4: self = .lz4
case COMPRESSION_LZMA: self = .lzma
default: return nil
}
}

public var rawValue: compression_algorithm {
switch self {
case .lzfse: return COMPRESSION_LZFSE
Expand All @@ -39,26 +50,24 @@ public enum Algorithm: CaseIterable {
}
}

/// Compression errors
public enum FilterError: Error {
/// Filter failed to initialize
case filterInitError

/// Invalid data in a call to compression_stream_process
case filterProcessError

/// Non-empty write after an output filter has been finalized
case writeToFinalizedFilter
}

/// Compression filter direction of operation, compress/decompress
public enum FilterOperation {
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
public enum FilterOperation: RawRepresentable {

/// Compress raw data to a compressed payload
case compress

/// Decompress a compressed payload to raw data
case decompress

public init?(rawValue: compression_stream_operation) {
switch rawValue {
case COMPRESSION_STREAM_ENCODE: self = .compress
case COMPRESSION_STREAM_DECODE: self = .decompress
default: return nil
}
}

public var rawValue: compression_stream_operation {
switch self {
case .compress: return COMPRESSION_STREAM_ENCODE
Expand All @@ -67,34 +76,48 @@ public enum FilterOperation {
}
}

@available(macOS 10.12, iOS 10.0, watchOS 3.0, tvOS 10.0, *)
/// Compression errors
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
public enum FilterError: Error {

/// Filter failed to initialize,
/// or invalid internal state,
/// or invalid parameters
case invalidState

/// Invalid data in a call to compression_stream_process,
/// or non-empty write after an output filter has been finalized
case invalidData
}

@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
extension compression_stream {

/// Initialize a compression_stream struct
///
/// - Parameter operation: direction of operation
/// - Parameter algorithm: compression algorithm
///
/// - Throws: `FilterError.filterInitError` if `algorithm` is not supported
/// - Throws: `FilterError.invalidState` if `algorithm` is not supported
/// by the Compression stream API
///
internal init(operation: FilterOperation, algorithm: Algorithm) throws {
self.init(dst_ptr: UnsafeMutablePointer<UInt8>.allocate(capacity:0),
self.init(dst_ptr: UnsafeMutablePointer<UInt8>(bitPattern: -1)!,
dst_size: 0,
src_ptr: UnsafeMutablePointer<UInt8>.allocate(capacity:0),
src_ptr: UnsafeMutablePointer<UInt8>(bitPattern: -1)!,
src_size: 0,
state: nil)
let status = compression_stream_init(&self, operation.rawValue, algorithm.rawValue)
guard status == COMPRESSION_STATUS_OK else { throw FilterError.filterInitError }
guard status == COMPRESSION_STATUS_OK else { throw FilterError.invalidState }
}
}

@available(macOS 10.12, iOS 10.0, watchOS 3.0, tvOS 10.0, *)
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
public class OutputFilter {
private var _stream: compression_stream
private var _buf: UnsafeMutablePointer<UInt8>
private let _bufCapacity: Int
private let _writeFunc: (Data?) throws -> ()
private let _writeFunc: (Data?) throws -> Void
private var _finalized: Bool = false

/// Initialize an output filter
Expand All @@ -105,12 +128,12 @@ public class OutputFilter {
/// - bufferCapacity: capacity of the internal data buffer
/// - writeFunc: called to write the processed data
///
/// - Throws: `FilterError.StreamInitError` if stream initialization failed
/// - Throws: `FilterError.invalidState` if stream initialization failed
public init(
_ operation: FilterOperation,
using algorithm: Algorithm,
bufferCapacity: Int = 65536,
writingTo writeFunc: @escaping (Data?) throws -> ()
writingTo writeFunc: @escaping (Data?) throws -> Void
) throws {
_stream = try compression_stream(operation: operation, algorithm: algorithm)
_buf = UnsafeMutablePointer<UInt8>.allocate(capacity: bufferCapacity)
Expand All @@ -127,21 +150,22 @@ public class OutputFilter {
/// - Parameter data: data to process
///
/// - Throws:
/// `FilterError.filterProcessError` if an error occurs during processing
/// `FilterError.writeToFinalizedFilter` if `data` is not empty/nil, and the
/// filter is the finalized state
public func write(_ data: Data?) throws {
/// `FilterError.invalidData` if an error occurs during processing,
/// or if `data` is not empty/nil, and the filter is the finalized state
public func write<D : DataProtocol>(_ data: D?) throws {
// Finalize if data is empty/nil
if data == nil || data!.isEmpty { try finalize() ; return }

// Fail if already finalized
if _finalized { throw FilterError.writeToFinalizedFilter }
if _finalized { throw FilterError.invalidData }

// Process all incoming data
try data!.withUnsafeBytes { (src_ptr: UnsafePointer<UInt8>) in
_stream.src_size = data!.count
_stream.src_ptr = src_ptr
while (_stream.src_size > 0) { _ = try process(finalizing: false) }
for region in data!.regions {
try region.withUnsafeBytes { (raw_src_ptr: UnsafeRawBufferPointer) in
_stream.src_size = region.count
_stream.src_ptr = raw_src_ptr.baseAddress!.assumingMemoryBound(to: UInt8.self)
while (_stream.src_size > 0) { _ = try _process(finalizing: false) }
}
}
}

Expand All @@ -151,15 +175,15 @@ public class OutputFilter {
/// When all output has been sent, the writingTo closure is called one last time with nil data.
/// Once the stream is finalized, writing non empty/nil data to the stream will throw an exception.
///
/// - Throws: `FilterError.StreamProcessError` if an error occurs during processing
/// - Throws: `FilterError.invalidData` if an error occurs during processing
public func finalize() throws {
// Do nothing if already finalized
if _finalized { return }

// Finalize stream
_stream.src_size = 0
var status = COMPRESSION_STATUS_OK
while (status != COMPRESSION_STATUS_END) { status = try process(finalizing: true) }
while (status != COMPRESSION_STATUS_END) { status = try _process(finalizing: true) }

// Update state
_finalized = true
Expand All @@ -180,13 +204,13 @@ public class OutputFilter {

// Call compression_stream_process with current src, and dst set to _buf, then write output to the closure
// Return status
private func process(finalizing finalize: Bool) throws -> compression_status {
private func _process(finalizing finalize: Bool) throws -> compression_status {
// Process current input, and write to buf
_stream.dst_ptr = _buf
_stream.dst_size = _bufCapacity

let status = compression_stream_process(&_stream, (finalize ? Int32(COMPRESSION_STREAM_FINALIZE.rawValue) : 0))
guard status != COMPRESSION_STATUS_ERROR else { throw FilterError.filterProcessError }
guard status != COMPRESSION_STATUS_ERROR else { throw FilterError.invalidData }

// Number of bytes written to buf
let writtenBytes = _bufCapacity - _stream.dst_size
Expand All @@ -202,12 +226,72 @@ public class OutputFilter {

}

@available(macOS 10.12, iOS 10.0, watchOS 3.0, tvOS 10.0, *)
public class InputFilter {
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
public class InputFilter<D: DataProtocol> {

// Internal buffer to read bytes from a DataProtocol implementation
private class InputFilterBuffer<D: DataProtocol> {
private var _data: D // current input data
private var _remaining: Int // total bytes remaining to process in _data
private var _regionIndex: D.Regions.Index // region being read in _data
private var _regionRemaining: Int // remaining bytes to read in region being read in _data

public init(_ data: D) throws {
_data = data
_remaining = _data.count
_regionRemaining = 0
_regionIndex = _data.regions.startIndex
if _regionIndex != _data.regions.endIndex { _regionRemaining = _data.regions[_regionIndex].count }
// Advance to first non-zero region
try advance(by: 0)
}

// Return number of remaining bytes
public func remaining() -> Int { return _remaining }

// Call f with a buffer to the remaining bytes of the current contiguous region
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R? {
if _remaining == 0 {
return try body(UnsafeRawBufferPointer(start: nil, count: 0))
} else {
let r = _data.regions[_regionIndex]
return try r.withUnsafeBytes { (region_buf: UnsafeRawBufferPointer) in
let src_buf = UnsafeRawBufferPointer(
start: region_buf.baseAddress!.assumingMemoryBound(to: UInt8.self) + region_buf.count - _regionRemaining,
count: _regionRemaining)
return try body(src_buf)
}
}
}

// Consume n bytes in the current region (n can be 0, up to _regionRemaining), and move to next non-empty region if needed
// post-condition: _remaining == 0 || _regionRemaining > 0
public func advance(by n: Int) throws {

// Sanity checks
if n > _regionRemaining { throw FilterError.invalidState } // invalid n

// Update counters
_regionRemaining -= n
_remaining -= n

// Move to next non-empty region if we are done with the current one
let r = _data.regions
while _regionRemaining == 0 {
r.formIndex(after: &_regionIndex)
if _regionIndex == r.endIndex { break }
_regionRemaining = r[_regionIndex].count
}

// Sanity checks
if _remaining != 0 && _regionRemaining == 0 { throw FilterError.invalidState }
}
}

private let _readCapacity: Int // size to use when calling _readFunc
private let _readFunc: (Int) throws -> D? // caller-provided read function
private var _stream: compression_stream
private var _buf: Data? = nil // current input data
private let _bufCapacity: Int // size to read when refilling _buf
private let _readFunc: (Int) throws -> Data?
private var _buf: InputFilterBuffer<D>? = nil // input
private var _eofReached: Bool = false // did we read end-of-file from the input?
private var _endReached: Bool = false // did we reach end-of-file from the decoder stream?

Expand All @@ -219,15 +303,15 @@ public class InputFilter {
/// - bufferCapacity: capacity of the internal data buffer
/// - readFunc: called to read the input data
///
/// - Throws: `FilterError.filterInitError` if filter initialization failed
/// - Throws: `FilterError.invalidState` if filter initialization failed
public init(
_ operation: FilterOperation,
using algorithm: Algorithm,
bufferCapacity: Int = 65536,
readingFrom readFunc: @escaping (Int) throws -> Data?
readingFrom readFunc: @escaping (Int) throws -> D?
) throws {
_stream = try compression_stream(operation: operation, algorithm: algorithm)
_bufCapacity = bufferCapacity
_readCapacity = bufferCapacity
_readFunc = readFunc
}

Expand All @@ -243,7 +327,7 @@ public class InputFilter {
/// - Returns: a new Data object containing at most `count` output bytes, or nil if no more data is available
///
/// - Throws:
/// `FilterError.filterProcessError` if an error occurs during processing
/// `FilterError.invalidData` if an error occurs during processing
public func readData(ofLength count: Int) throws -> Data? {
// Sanity check
precondition(count > 0, "number of bytes to read can't be 0")
Expand All @@ -263,32 +347,37 @@ public class InputFilter {
while _stream.dst_size > 0 && !_endReached {

// Refill _buf if needed, and EOF was not yet read
if _stream.src_size == 0 && !_eofReached {
_buf = try _readFunc(_bufCapacity) // may be nil
// Reset src_size to full _buf size
if _buf?.count ?? 0 == 0 { _eofReached = true }
_stream.src_size = _buf?.count ?? 0
if (_buf?.remaining() ?? 0) == 0 && !_eofReached {
let data = try _readFunc(_readCapacity) // may be nil, or empty
if data?.count ?? 0 == 0 { // nil or empty -> EOF
_eofReached = true
_buf = nil
} else {
_buf = try InputFilterBuffer(data!)
}
}

// Process some data
if let buf = _buf {
try buf.withUnsafeBytes { (src_ptr: UnsafePointer<UInt8>) in

// Next byte to read
_stream.src_ptr = src_ptr + buf.count - _stream.src_size

try buf.withUnsafeBytes { (src_buf: UnsafeRawBufferPointer) in
// Point to buffer
_stream.src_ptr = src_buf.baseAddress!.assumingMemoryBound(to: UInt8.self)
_stream.src_size = src_buf.count
let status = compression_stream_process(&_stream, (_eofReached ? Int32(COMPRESSION_STREAM_FINALIZE.rawValue) : 0))
guard status != COMPRESSION_STATUS_ERROR else { throw FilterError.filterProcessError }
guard status != COMPRESSION_STATUS_ERROR else { throw FilterError.invalidData }
if status == COMPRESSION_STATUS_END { _endReached = true }
// Advance by the number of consumed bytes
let consumed = src_buf.count - _stream.src_size
try buf.advance(by: consumed)
}
}
else {
} else {
// No data available, process until END reached
let status = compression_stream_process(&_stream, (_eofReached ? Int32(COMPRESSION_STREAM_FINALIZE.rawValue) : 0))
guard status != COMPRESSION_STATUS_ERROR else { throw FilterError.filterProcessError }
guard status != COMPRESSION_STATUS_ERROR else { throw FilterError.invalidData }
if status == COMPRESSION_STATUS_END { _endReached = true }

}
}

} // _stream.dst_size > 0 && !_endReached

} // result.withUnsafeMutableBytes

Expand Down
6 changes: 3 additions & 3 deletions test/stdlib/Compression.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DataSource {

}

@available(macOS 10.12, iOS 10.0, watchOS 3.0, tvOS 10.0, *)
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
func ofiltercompress_ifilterdecompress(
_ contents: Data, using algo: Algorithm
) throws -> Bool {
Expand Down Expand Up @@ -61,7 +61,7 @@ func ofiltercompress_ifilterdecompress(
return contents == decompressed
}

@available(macOS 10.12, iOS 10.0, watchOS 3.0, tvOS 10.0, *)
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
func ifiltercompress_ofilterdecompress(
_ contents: Data, using algo: Algorithm
) throws -> Bool {
Expand Down Expand Up @@ -113,7 +113,7 @@ func randomString(withBlockLength n: Int) -> String {

let tests = TestSuite("Compression")

if #available(macOS 10.12, iOS 10.0, watchOS 3.0, tvOS 10.0, *) {
if #available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) {

do {
for blockLength in [0, 1, 2, 5, 10, 100] {
Expand Down