Skip to content

Propagate byte read/write counts from unsafe buffer ops #364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions core/api/kotlinx-io-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,16 @@ public final class kotlinx/io/unsafe/UnsafeBufferOperations {
public final fun iterate (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)V
public final fun moveToTail (Lkotlinx/io/Buffer;[BII)V
public static synthetic fun moveToTail$default (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[BIIILjava/lang/Object;)V
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)V
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function3;)V
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function2;)V
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function3;)V
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)I
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function3;)I
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function2;)I
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function3;)I
}

public final class kotlinx/io/unsafe/UnsafeBufferOperationsJvmKt {
public static final fun readBulk (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[Ljava/nio/ByteBuffer;Lkotlin/jvm/functions/Function2;)V
public static final fun readFromHead (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function1;)V
public static final fun writeToTail (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function1;)V
public static final fun readBulk (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[Ljava/nio/ByteBuffer;Lkotlin/jvm/functions/Function2;)J
public static final fun readFromHead (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function1;)I
public static final fun writeToTail (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function1;)I
}

public final class kotlinx/io/unsafe/UnsafeBufferOperationsKt {
Expand Down
8 changes: 4 additions & 4 deletions core/api/kotlinx-io-core.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ final object kotlinx.io.unsafe/UnsafeBufferOperations { // kotlinx.io.unsafe/Uns
final fun moveToTail(kotlinx.io/Buffer, kotlin/ByteArray, kotlin/Int = ..., kotlin/Int = ...) // kotlinx.io.unsafe/UnsafeBufferOperations.moveToTail|moveToTail(kotlinx.io.Buffer;kotlin.ByteArray;kotlin.Int;kotlin.Int){}[0]
final inline fun iterate(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/BufferIterationContext, kotlinx.io/Segment?, kotlin/Unit>) // kotlinx.io.unsafe/UnsafeBufferOperations.iterate|iterate(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.BufferIterationContext,kotlinx.io.Segment?,kotlin.Unit>){}[0]
final inline fun iterate(kotlinx.io/Buffer, kotlin/Long, kotlin/Function3<kotlinx.io.unsafe/BufferIterationContext, kotlinx.io/Segment?, kotlin/Long, kotlin/Unit>) // kotlinx.io.unsafe/UnsafeBufferOperations.iterate|iterate(kotlinx.io.Buffer;kotlin.Long;kotlin.Function3<kotlinx.io.unsafe.BufferIterationContext,kotlinx.io.Segment?,kotlin.Long,kotlin.Unit>){}[0]
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/SegmentReadContext, kotlinx.io/Segment, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.SegmentReadContext,kotlinx.io.Segment,kotlin.Int>){}[0]
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function2<kotlinx.io.unsafe/SegmentWriteContext, kotlinx.io/Segment, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function2<kotlinx.io.unsafe.SegmentWriteContext,kotlinx.io.Segment,kotlin.Int>){}[0]
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/SegmentReadContext, kotlinx.io/Segment, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.SegmentReadContext,kotlinx.io.Segment,kotlin.Int>){}[0]
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function2<kotlinx.io.unsafe/SegmentWriteContext, kotlinx.io/Segment, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function2<kotlinx.io.unsafe.SegmentWriteContext,kotlinx.io.Segment,kotlin.Int>){}[0]
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
}

final val kotlinx.io.files/SystemFileSystem // kotlinx.io.files/SystemFileSystem|{}SystemFileSystem[0]
Expand Down
50 changes: 34 additions & 16 deletions core/common/src/unsafe/UnsafeBufferOperations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public object UnsafeBufferOperations {
* and data from the consumed prefix will be no longer available for read.
* If the operation does not consume anything, the action should return `0`.
* It's considered an error to return a negative value or a value exceeding the size of a readable range.
* This value will also be propagated as the function return value.
*
* If [readAction] ends execution by throwing an exception, no data will be consumed from the buffer.
*
Expand All @@ -75,6 +76,8 @@ public object UnsafeBufferOperations {
* the best effort basis, meaning that there are no strong zero-copy guarantees
* and the copy will be created if it could not be omitted.
*
* @return Number of bytes consumed as returned by [readAction].
*
* @throws IllegalStateException when [readAction] returns negative value or a values exceeding
* the `endIndexExclusive - startIndexInclusive` value.
* @throws IllegalArgumentException when the [buffer] is empty.
Expand All @@ -84,14 +87,16 @@ public object UnsafeBufferOperations {
public inline fun readFromHead(
buffer: Buffer,
readAction: (bytes: ByteArray, startIndexInclusive: Int, endIndexExclusive: Int) -> Int
) {
): Int {
require(!buffer.exhausted()) { "Buffer is empty" }
val head = buffer.head!!
val bytesRead = readAction(head.dataAsByteArray(true), head.pos, head.limit)
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
if (bytesRead == 0) return
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
buffer.skip(bytesRead.toLong())
if (bytesRead != 0) {
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
buffer.skip(bytesRead.toLong())
}
return bytesRead
}

/**
Expand All @@ -107,26 +112,31 @@ public object UnsafeBufferOperations {
* and data from the consumed prefix will be no longer available for read.
* If the operation does not consume anything, the action should return `0`.
* It's considered an error to return a negative value or a value exceeding the [Segment.size].
* This value will also be propagated as the function return value.
*
* Both [readAction] arguments are valid only within [readAction] scope,
* it's an error to store and reuse it later.
*
* If the buffer is empty, [IllegalArgumentException] will be thrown.
*
* @return Number of bytes consumed as returned by [readAction].
*
* @throws IllegalStateException when [readAction] returns negative value or a values exceeding
* the [Segment.size] value.
* @throws IllegalArgumentException when the [buffer] is empty.
*
* @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.readUleb128
*/
public inline fun readFromHead(buffer: Buffer, readAction: (SegmentReadContext, Segment) -> Int) {
public inline fun readFromHead(buffer: Buffer, readAction: (SegmentReadContext, Segment) -> Int): Int {
require(!buffer.exhausted()) { "Buffer is empty" }
val head = buffer.head!!
val bytesRead = readAction(SegmentReadContextImpl, head)
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
if (bytesRead == 0) return
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
buffer.skip(bytesRead.toLong())
if (bytesRead != 0) {
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
buffer.skip(bytesRead.toLong())
}
return bytesRead
}

/**
Expand All @@ -146,13 +156,16 @@ public object UnsafeBufferOperations {
* The value returned by the [writeAction] denotes the number of bytes successfully written to the buffer.
* If no data was written, `0` should be returned.
* It's an error to return a negative value or a value exceeding the size of the provided writeable range.
* This value will also be propagated as the function return value.
*
* If [writeAction] ends execution by throwing an exception, no data will be written to the buffer.
*
* The data array is passed to the [writeAction] directly from the buffer's internal storage without copying
* on the best-effort basis, meaning that there are no strong zero-copy guarantees
* and the copy will be created if it could not be omitted.
*
* @return Number of bytes written as returned by [writeAction].
*
* @throws IllegalStateException when [minimumCapacity] is too large and could not be fulfilled.
* @throws IllegalStateException when [writeAction] returns a negative value or a value exceeding
* the `endIndexExclusive - startIndexInclusive` value.
Expand All @@ -162,7 +175,7 @@ public object UnsafeBufferOperations {
public inline fun writeToTail(
buffer: Buffer, minimumCapacity: Int,
writeAction: (bytes: ByteArray, startIndexInclusive: Int, endIndexExclusive: Int) -> Int
) {
): Int {
val tail = buffer.writableSegment(minimumCapacity)

val data = tail.dataAsByteArray(false)
Expand All @@ -175,7 +188,7 @@ public object UnsafeBufferOperations {
tail.writeBackData(data, bytesWritten)
tail.limit += bytesWritten
buffer.sizeMut += bytesWritten
return
return bytesWritten
}

check(bytesWritten in 0..tail.remainingCapacity) {
Expand All @@ -185,11 +198,12 @@ public object UnsafeBufferOperations {
tail.writeBackData(data, bytesWritten)
tail.limit += bytesWritten
buffer.sizeMut += bytesWritten
return
return bytesWritten
}
if (tail.isEmpty()) {
buffer.recycleTail()
}
return bytesWritten
}

/**
Expand All @@ -208,10 +222,13 @@ public object UnsafeBufferOperations {
* The value returned by the [writeAction] denotes the number of bytes successfully written to the buffer.
* If no data was written, `0` should be returned.
* It's an error to return a negative value or a value exceeding the [Segment.remainingCapacity].
* This value will also be propagated as the function return value.
*
* Both [writeAction] arguments are valid only within [writeAction] scope,
* it's an error to store and reuse it later.
*
* @return Number of bytes written as returned by [writeAction].
*
* @throws IllegalStateException when [minimumCapacity] is too large and could not be fulfilled.
* @throws IllegalStateException when [writeAction] returns a negative value or a value exceeding
* the [Segment.remainingCapacity] value for the provided segment.
Expand All @@ -222,15 +239,15 @@ public object UnsafeBufferOperations {
buffer: Buffer,
minimumCapacity: Int,
writeAction: (SegmentWriteContext, Segment) -> Int
) {
): Int {
val tail = buffer.writableSegment(minimumCapacity)
val bytesWritten = writeAction(SegmentWriteContextImpl, tail)

// fast path
if (bytesWritten == minimumCapacity) {
tail.limit += bytesWritten
buffer.sizeMut += bytesWritten
return
return bytesWritten
}

check(bytesWritten in 0..tail.remainingCapacity) {
Expand All @@ -239,12 +256,13 @@ public object UnsafeBufferOperations {
if (bytesWritten != 0) {
tail.limit += bytesWritten
buffer.sizeMut += bytesWritten
return
return bytesWritten
}

if (tail.isEmpty()) {
buffer.recycleTail()
}
return bytesWritten
}

/**
Expand Down
20 changes: 13 additions & 7 deletions core/common/test/unsafe/UnsafeBufferOperationsReadTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
import kotlin.test.fail

@OptIn(UnsafeIoApi::class)
class UnsafeBufferOperationsReadTest {
Expand All @@ -38,10 +39,11 @@ class UnsafeBufferOperationsReadTest {

val buffer = Buffer().apply { write(expectedData) }
for (idx in actualData.indices) {
UnsafeBufferOperations.readFromHead(buffer) { data, startIndex, _ ->
val read = UnsafeBufferOperations.readFromHead(buffer) { data, startIndex, _ ->
actualData[idx] = data[startIndex]
1
}
assertEquals(1, read)
assertEquals(actualData.size - idx - 1, buffer.size.toInt())
}
assertTrue(buffer.exhausted())
Expand All @@ -51,36 +53,40 @@ class UnsafeBufferOperationsReadTest {
@Test
fun readNothing() {
val buffer = Buffer().apply { writeInt(42) }
UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 }
val read1 = UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 }
assertEquals(0, read1)
assertEquals(42, buffer.readInt())

buffer.writeInt(42)
UnsafeBufferOperations.readFromHead(buffer) { _, _ -> 0 }
val read2 = UnsafeBufferOperations.readFromHead(buffer) { _, _ -> 0 }
assertEquals(0, read2)
assertEquals(42, buffer.readInt())
}

@Test
fun readEverything() {
val buffer = Buffer().apply { writeString("hello world") }
UnsafeBufferOperations.readFromHead(buffer) { _, startIndex, endIndex ->
val read1 = UnsafeBufferOperations.readFromHead(buffer) { _, startIndex, endIndex ->
endIndex - startIndex
}
assertEquals(11, read1)
assertTrue(buffer.exhausted())

buffer.writeString("hello world")
UnsafeBufferOperations.readFromHead(buffer) { _, seg -> seg.size }
val read2 = UnsafeBufferOperations.readFromHead(buffer) { _, seg -> seg.size }
assertEquals(11, read2)
assertTrue(buffer.exhausted())
}

@Test
fun readFromEmptyBuffer() {
val buffer = Buffer()
assertFailsWith<IllegalArgumentException> {
UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 }
UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> fail() }
}

assertFailsWith<IllegalArgumentException> {
UnsafeBufferOperations.readFromHead(buffer) { _, _ -> 0 }
UnsafeBufferOperations.readFromHead(buffer) { _, _ -> fail() }
}
}

Expand Down
19 changes: 12 additions & 7 deletions core/common/test/unsafe/UnsafeBufferOperationsWriteTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ class UnsafeBufferOperationsWriteTest {
val data = "hello world".encodeToByteArray()

for (idx in data.indices) {
UnsafeBufferOperations.writeToTail(buffer, 1) { writeable, pos, _ ->
val written = UnsafeBufferOperations.writeToTail(buffer, 1) { writeable, pos, _ ->
writeable[pos] = data[idx]
1
}
assertEquals(1, written)
assertEquals(idx + 1, buffer.size.toInt())
}
assertEquals("hello world", buffer.readString())
Expand All @@ -51,10 +52,12 @@ class UnsafeBufferOperationsWriteTest {
fun writeNothing() {
val buffer = Buffer()

UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> 0 }
val write1 = UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> 0 }
assertEquals(0, write1)
assertTrue(buffer.exhausted())

UnsafeBufferOperations.writeToTail(buffer, 1) { _, _ -> 0 }
val write2 = UnsafeBufferOperations.writeToTail(buffer, 1) { _, _ -> 0 }
assertEquals(0, write2)
assertTrue(buffer.exhausted())

buffer.writeInt(42)
Expand All @@ -75,12 +78,13 @@ class UnsafeBufferOperationsWriteTest {
@Test
fun writeWholeBuffer() {
val buffer = Buffer()
UnsafeBufferOperations.writeToTail(buffer, 1) { data, from, to ->
val written = UnsafeBufferOperations.writeToTail(buffer, 1) { data, from, to ->
for (idx in from..<to) {
data[idx] = 42
}
to - from
}
assertEquals(Segment.SIZE, written)
assertEquals(Segment.SIZE, buffer.size.toInt())
assertArrayEquals(ByteArray(Segment.SIZE) { 42 }, buffer.readByteArray())
}
Expand All @@ -89,25 +93,26 @@ class UnsafeBufferOperationsWriteTest {
fun writeWithCtx() {
val buffer = Buffer()

UnsafeBufferOperations.writeToTail(buffer, 1) { ctx, segment ->
val written = UnsafeBufferOperations.writeToTail(buffer, 1) { ctx, segment ->
ctx.setUnchecked(segment, 0, 1)
ctx.setUnchecked(segment, 1, 2)
2
}

assertEquals(2, written)
assertArrayEquals(byteArrayOf(1, 2), buffer.readByteArray())
}

@Test
fun requireToManyBytes() {
val buffer = Buffer()
assertFailsWith<IllegalArgumentException> {
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _, _ -> 0 }
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _, _ -> fail() }
}
assertTrue(buffer.exhausted())

assertFailsWith<IllegalArgumentException> {
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _ -> 0 }
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _ -> fail() }
}
assertTrue(buffer.exhausted())
}
Expand Down
Loading