Skip to content

Commit d7b0533

Browse files
authored
Propagate byte read/write counts from unsafe buffer ops (#364)
* Propagate byte read/write counts from unsafe buffer ops * Include JVM-specific extensions as well This allows the caller to more easily perform their own bookkeeping based on the amount of bytes moved. Closes #360
1 parent ecb1d38 commit d7b0533

9 files changed

+118
-67
lines changed

core/api/kotlinx-io-core.api

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -310,16 +310,16 @@ public final class kotlinx/io/unsafe/UnsafeBufferOperations {
310310
public final fun iterate (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)V
311311
public final fun moveToTail (Lkotlinx/io/Buffer;[BII)V
312312
public static synthetic fun moveToTail$default (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[BIIILjava/lang/Object;)V
313-
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)V
314-
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function3;)V
315-
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function2;)V
316-
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function3;)V
313+
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)I
314+
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function3;)I
315+
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function2;)I
316+
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function3;)I
317317
}
318318

319319
public final class kotlinx/io/unsafe/UnsafeBufferOperationsJvmKt {
320-
public static final fun readBulk (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[Ljava/nio/ByteBuffer;Lkotlin/jvm/functions/Function2;)V
321-
public static final fun readFromHead (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function1;)V
322-
public static final fun writeToTail (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function1;)V
320+
public static final fun readBulk (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[Ljava/nio/ByteBuffer;Lkotlin/jvm/functions/Function2;)J
321+
public static final fun readFromHead (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function1;)I
322+
public static final fun writeToTail (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function1;)I
323323
}
324324

325325
public final class kotlinx/io/unsafe/UnsafeBufferOperationsKt {

core/api/kotlinx-io-core.klib.api

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ final object kotlinx.io.unsafe/UnsafeBufferOperations { // kotlinx.io.unsafe/Uns
213213
final fun moveToTail(kotlinx.io/Buffer, kotlin/ByteArray, kotlin/Int = ..., kotlin/Int = ...) // kotlinx.io.unsafe/UnsafeBufferOperations.moveToTail|moveToTail(kotlinx.io.Buffer;kotlin.ByteArray;kotlin.Int;kotlin.Int){}[0]
214214
final inline fun iterate(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/BufferIterationContext, kotlinx.io/Segment?, kotlin/Unit>) // kotlinx.io.unsafe/UnsafeBufferOperations.iterate|iterate(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.BufferIterationContext,kotlinx.io.Segment?,kotlin.Unit>){}[0]
215215
final inline fun iterate(kotlinx.io/Buffer, kotlin/Long, kotlin/Function3<kotlinx.io.unsafe/BufferIterationContext, kotlinx.io/Segment?, kotlin/Long, kotlin/Unit>) // kotlinx.io.unsafe/UnsafeBufferOperations.iterate|iterate(kotlinx.io.Buffer;kotlin.Long;kotlin.Function3<kotlinx.io.unsafe.BufferIterationContext,kotlinx.io.Segment?,kotlin.Long,kotlin.Unit>){}[0]
216-
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/SegmentReadContext, kotlinx.io/Segment, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.SegmentReadContext,kotlinx.io.Segment,kotlin.Int>){}[0]
217-
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
218-
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function2<kotlinx.io.unsafe/SegmentWriteContext, kotlinx.io/Segment, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function2<kotlinx.io.unsafe.SegmentWriteContext,kotlinx.io.Segment,kotlin.Int>){}[0]
219-
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
216+
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/SegmentReadContext, kotlinx.io/Segment, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.SegmentReadContext,kotlinx.io.Segment,kotlin.Int>){}[0]
217+
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
218+
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function2<kotlinx.io.unsafe/SegmentWriteContext, kotlinx.io/Segment, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function2<kotlinx.io.unsafe.SegmentWriteContext,kotlinx.io.Segment,kotlin.Int>){}[0]
219+
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
220220
}
221221

222222
final val kotlinx.io.files/SystemFileSystem // kotlinx.io.files/SystemFileSystem|{}SystemFileSystem[0]

core/common/src/unsafe/UnsafeBufferOperations.kt

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public object UnsafeBufferOperations {
6666
* and data from the consumed prefix will be no longer available for read.
6767
* If the operation does not consume anything, the action should return `0`.
6868
* It's considered an error to return a negative value or a value exceeding the size of a readable range.
69+
* This value will also be propagated as the function return value.
6970
*
7071
* If [readAction] ends execution by throwing an exception, no data will be consumed from the buffer.
7172
*
@@ -75,6 +76,8 @@ public object UnsafeBufferOperations {
7576
* the best effort basis, meaning that there are no strong zero-copy guarantees
7677
* and the copy will be created if it could not be omitted.
7778
*
79+
* @return Number of bytes consumed as returned by [readAction].
80+
*
7881
* @throws IllegalStateException when [readAction] returns negative value or a values exceeding
7982
* the `endIndexExclusive - startIndexInclusive` value.
8083
* @throws IllegalArgumentException when the [buffer] is empty.
@@ -84,14 +87,16 @@ public object UnsafeBufferOperations {
8487
public inline fun readFromHead(
8588
buffer: Buffer,
8689
readAction: (bytes: ByteArray, startIndexInclusive: Int, endIndexExclusive: Int) -> Int
87-
) {
90+
): Int {
8891
require(!buffer.exhausted()) { "Buffer is empty" }
8992
val head = buffer.head!!
9093
val bytesRead = readAction(head.dataAsByteArray(true), head.pos, head.limit)
91-
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
92-
if (bytesRead == 0) return
93-
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
94-
buffer.skip(bytesRead.toLong())
94+
if (bytesRead != 0) {
95+
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
96+
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
97+
buffer.skip(bytesRead.toLong())
98+
}
99+
return bytesRead
95100
}
96101

97102
/**
@@ -107,26 +112,31 @@ public object UnsafeBufferOperations {
107112
* and data from the consumed prefix will be no longer available for read.
108113
* If the operation does not consume anything, the action should return `0`.
109114
* It's considered an error to return a negative value or a value exceeding the [Segment.size].
115+
* This value will also be propagated as the function return value.
110116
*
111117
* Both [readAction] arguments are valid only within [readAction] scope,
112118
* it's an error to store and reuse it later.
113119
*
114120
* If the buffer is empty, [IllegalArgumentException] will be thrown.
115121
*
122+
* @return Number of bytes consumed as returned by [readAction].
123+
*
116124
* @throws IllegalStateException when [readAction] returns negative value or a values exceeding
117125
* the [Segment.size] value.
118126
* @throws IllegalArgumentException when the [buffer] is empty.
119127
*
120128
* @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.readUleb128
121129
*/
122-
public inline fun readFromHead(buffer: Buffer, readAction: (SegmentReadContext, Segment) -> Int) {
130+
public inline fun readFromHead(buffer: Buffer, readAction: (SegmentReadContext, Segment) -> Int): Int {
123131
require(!buffer.exhausted()) { "Buffer is empty" }
124132
val head = buffer.head!!
125133
val bytesRead = readAction(SegmentReadContextImpl, head)
126-
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
127-
if (bytesRead == 0) return
128-
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
129-
buffer.skip(bytesRead.toLong())
134+
if (bytesRead != 0) {
135+
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
136+
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
137+
buffer.skip(bytesRead.toLong())
138+
}
139+
return bytesRead
130140
}
131141

132142
/**
@@ -146,13 +156,16 @@ public object UnsafeBufferOperations {
146156
* The value returned by the [writeAction] denotes the number of bytes successfully written to the buffer.
147157
* If no data was written, `0` should be returned.
148158
* It's an error to return a negative value or a value exceeding the size of the provided writeable range.
159+
* This value will also be propagated as the function return value.
149160
*
150161
* If [writeAction] ends execution by throwing an exception, no data will be written to the buffer.
151162
*
152163
* The data array is passed to the [writeAction] directly from the buffer's internal storage without copying
153164
* on the best-effort basis, meaning that there are no strong zero-copy guarantees
154165
* and the copy will be created if it could not be omitted.
155166
*
167+
* @return Number of bytes written as returned by [writeAction].
168+
*
156169
* @throws IllegalStateException when [minimumCapacity] is too large and could not be fulfilled.
157170
* @throws IllegalStateException when [writeAction] returns a negative value or a value exceeding
158171
* the `endIndexExclusive - startIndexInclusive` value.
@@ -162,7 +175,7 @@ public object UnsafeBufferOperations {
162175
public inline fun writeToTail(
163176
buffer: Buffer, minimumCapacity: Int,
164177
writeAction: (bytes: ByteArray, startIndexInclusive: Int, endIndexExclusive: Int) -> Int
165-
) {
178+
): Int {
166179
val tail = buffer.writableSegment(minimumCapacity)
167180

168181
val data = tail.dataAsByteArray(false)
@@ -175,7 +188,7 @@ public object UnsafeBufferOperations {
175188
tail.writeBackData(data, bytesWritten)
176189
tail.limit += bytesWritten
177190
buffer.sizeMut += bytesWritten
178-
return
191+
return bytesWritten
179192
}
180193

181194
check(bytesWritten in 0..tail.remainingCapacity) {
@@ -185,11 +198,12 @@ public object UnsafeBufferOperations {
185198
tail.writeBackData(data, bytesWritten)
186199
tail.limit += bytesWritten
187200
buffer.sizeMut += bytesWritten
188-
return
201+
return bytesWritten
189202
}
190203
if (tail.isEmpty()) {
191204
buffer.recycleTail()
192205
}
206+
return bytesWritten
193207
}
194208

195209
/**
@@ -208,10 +222,13 @@ public object UnsafeBufferOperations {
208222
* The value returned by the [writeAction] denotes the number of bytes successfully written to the buffer.
209223
* If no data was written, `0` should be returned.
210224
* It's an error to return a negative value or a value exceeding the [Segment.remainingCapacity].
225+
* This value will also be propagated as the function return value.
211226
*
212227
* Both [writeAction] arguments are valid only within [writeAction] scope,
213228
* it's an error to store and reuse it later.
214229
*
230+
* @return Number of bytes written as returned by [writeAction].
231+
*
215232
* @throws IllegalStateException when [minimumCapacity] is too large and could not be fulfilled.
216233
* @throws IllegalStateException when [writeAction] returns a negative value or a value exceeding
217234
* the [Segment.remainingCapacity] value for the provided segment.
@@ -222,15 +239,15 @@ public object UnsafeBufferOperations {
222239
buffer: Buffer,
223240
minimumCapacity: Int,
224241
writeAction: (SegmentWriteContext, Segment) -> Int
225-
) {
242+
): Int {
226243
val tail = buffer.writableSegment(minimumCapacity)
227244
val bytesWritten = writeAction(SegmentWriteContextImpl, tail)
228245

229246
// fast path
230247
if (bytesWritten == minimumCapacity) {
231248
tail.limit += bytesWritten
232249
buffer.sizeMut += bytesWritten
233-
return
250+
return bytesWritten
234251
}
235252

236253
check(bytesWritten in 0..tail.remainingCapacity) {
@@ -239,12 +256,13 @@ public object UnsafeBufferOperations {
239256
if (bytesWritten != 0) {
240257
tail.limit += bytesWritten
241258
buffer.sizeMut += bytesWritten
242-
return
259+
return bytesWritten
243260
}
244261

245262
if (tail.isEmpty()) {
246263
buffer.recycleTail()
247264
}
265+
return bytesWritten
248266
}
249267

250268
/**

core/common/test/unsafe/UnsafeBufferOperationsReadTest.kt

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import kotlin.test.Test
1313
import kotlin.test.assertEquals
1414
import kotlin.test.assertFailsWith
1515
import kotlin.test.assertTrue
16+
import kotlin.test.fail
1617

1718
@OptIn(UnsafeIoApi::class)
1819
class UnsafeBufferOperationsReadTest {
@@ -38,10 +39,11 @@ class UnsafeBufferOperationsReadTest {
3839

3940
val buffer = Buffer().apply { write(expectedData) }
4041
for (idx in actualData.indices) {
41-
UnsafeBufferOperations.readFromHead(buffer) { data, startIndex, _ ->
42+
val read = UnsafeBufferOperations.readFromHead(buffer) { data, startIndex, _ ->
4243
actualData[idx] = data[startIndex]
4344
1
4445
}
46+
assertEquals(1, read)
4547
assertEquals(actualData.size - idx - 1, buffer.size.toInt())
4648
}
4749
assertTrue(buffer.exhausted())
@@ -51,36 +53,40 @@ class UnsafeBufferOperationsReadTest {
5153
@Test
5254
fun readNothing() {
5355
val buffer = Buffer().apply { writeInt(42) }
54-
UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 }
56+
val read1 = UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 }
57+
assertEquals(0, read1)
5558
assertEquals(42, buffer.readInt())
5659

5760
buffer.writeInt(42)
58-
UnsafeBufferOperations.readFromHead(buffer) { _, _ -> 0 }
61+
val read2 = UnsafeBufferOperations.readFromHead(buffer) { _, _ -> 0 }
62+
assertEquals(0, read2)
5963
assertEquals(42, buffer.readInt())
6064
}
6165

6266
@Test
6367
fun readEverything() {
6468
val buffer = Buffer().apply { writeString("hello world") }
65-
UnsafeBufferOperations.readFromHead(buffer) { _, startIndex, endIndex ->
69+
val read1 = UnsafeBufferOperations.readFromHead(buffer) { _, startIndex, endIndex ->
6670
endIndex - startIndex
6771
}
72+
assertEquals(11, read1)
6873
assertTrue(buffer.exhausted())
6974

7075
buffer.writeString("hello world")
71-
UnsafeBufferOperations.readFromHead(buffer) { _, seg -> seg.size }
76+
val read2 = UnsafeBufferOperations.readFromHead(buffer) { _, seg -> seg.size }
77+
assertEquals(11, read2)
7278
assertTrue(buffer.exhausted())
7379
}
7480

7581
@Test
7682
fun readFromEmptyBuffer() {
7783
val buffer = Buffer()
7884
assertFailsWith<IllegalArgumentException> {
79-
UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 }
85+
UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> fail() }
8086
}
8187

8288
assertFailsWith<IllegalArgumentException> {
83-
UnsafeBufferOperations.readFromHead(buffer) { _, _ -> 0 }
89+
UnsafeBufferOperations.readFromHead(buffer) { _, _ -> fail() }
8490
}
8591
}
8692

core/common/test/unsafe/UnsafeBufferOperationsWriteTest.kt

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ class UnsafeBufferOperationsWriteTest {
3838
val data = "hello world".encodeToByteArray()
3939

4040
for (idx in data.indices) {
41-
UnsafeBufferOperations.writeToTail(buffer, 1) { writeable, pos, _ ->
41+
val written = UnsafeBufferOperations.writeToTail(buffer, 1) { writeable, pos, _ ->
4242
writeable[pos] = data[idx]
4343
1
4444
}
45+
assertEquals(1, written)
4546
assertEquals(idx + 1, buffer.size.toInt())
4647
}
4748
assertEquals("hello world", buffer.readString())
@@ -51,10 +52,12 @@ class UnsafeBufferOperationsWriteTest {
5152
fun writeNothing() {
5253
val buffer = Buffer()
5354

54-
UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> 0 }
55+
val write1 = UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> 0 }
56+
assertEquals(0, write1)
5557
assertTrue(buffer.exhausted())
5658

57-
UnsafeBufferOperations.writeToTail(buffer, 1) { _, _ -> 0 }
59+
val write2 = UnsafeBufferOperations.writeToTail(buffer, 1) { _, _ -> 0 }
60+
assertEquals(0, write2)
5861
assertTrue(buffer.exhausted())
5962

6063
buffer.writeInt(42)
@@ -75,12 +78,13 @@ class UnsafeBufferOperationsWriteTest {
7578
@Test
7679
fun writeWholeBuffer() {
7780
val buffer = Buffer()
78-
UnsafeBufferOperations.writeToTail(buffer, 1) { data, from, to ->
81+
val written = UnsafeBufferOperations.writeToTail(buffer, 1) { data, from, to ->
7982
for (idx in from..<to) {
8083
data[idx] = 42
8184
}
8285
to - from
8386
}
87+
assertEquals(Segment.SIZE, written)
8488
assertEquals(Segment.SIZE, buffer.size.toInt())
8589
assertArrayEquals(ByteArray(Segment.SIZE) { 42 }, buffer.readByteArray())
8690
}
@@ -89,25 +93,26 @@ class UnsafeBufferOperationsWriteTest {
8993
fun writeWithCtx() {
9094
val buffer = Buffer()
9195

92-
UnsafeBufferOperations.writeToTail(buffer, 1) { ctx, segment ->
96+
val written = UnsafeBufferOperations.writeToTail(buffer, 1) { ctx, segment ->
9397
ctx.setUnchecked(segment, 0, 1)
9498
ctx.setUnchecked(segment, 1, 2)
9599
2
96100
}
97101

102+
assertEquals(2, written)
98103
assertArrayEquals(byteArrayOf(1, 2), buffer.readByteArray())
99104
}
100105

101106
@Test
102107
fun requireToManyBytes() {
103108
val buffer = Buffer()
104109
assertFailsWith<IllegalArgumentException> {
105-
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _, _ -> 0 }
110+
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _, _ -> fail() }
106111
}
107112
assertTrue(buffer.exhausted())
108113

109114
assertFailsWith<IllegalArgumentException> {
110-
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _ -> 0 }
115+
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _ -> fail() }
111116
}
112117
assertTrue(buffer.exhausted())
113118
}

0 commit comments

Comments
 (0)