Skip to content

Improve segment pool test coverage #379

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/jvm/src/SegmentPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ internal actual object SegmentPool {
* The number of hash buckets. This number needs to balance keeping the pool small and contention
* low. We use the number of processors rounded up to the nearest power of two. For example a
* machine with 6 cores will have 8 hash buckets.
*
* Visible for testing only.
*/
private val HASH_BUCKET_COUNT =
internal val HASH_BUCKET_COUNT =
Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 2 - 1)

private val HASH_BUCKET_COUNT_L2 = (HASH_BUCKET_COUNT / 2).coerceAtLeast(1)
Expand All @@ -124,7 +126,8 @@ internal actual object SegmentPool {
else -> "4194304" // 4MB
}

private val SECOND_LEVEL_POOL_TOTAL_SIZE =
// visible for testing
internal val SECOND_LEVEL_POOL_TOTAL_SIZE =
System.getProperty("kotlinx.io.pool.size.bytes", DEFAULT_SECOND_LEVEL_POOL_TOTAL_SIZE)
.toIntOrNull()?.coerceAtLeast(0) ?: 0

Expand Down
112 changes: 112 additions & 0 deletions core/jvm/test/PoolingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

package kotlinx.io

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.random.Random
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

class PoolingTest {
Expand Down Expand Up @@ -40,4 +45,111 @@ class PoolingTest {
otherBuffer.clear()
assertTrue(poolSize < SegmentPool.byteCount)
}

@Test
fun segmentAcquisitionAndRelease() {
val secondTierSize = SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE
val firstTierSize = SegmentPool.MAX_SIZE

// fill the pool by requiring max possible segments count and then
// releasing them all
val segments = mutableSetOf<Segment>()
var size = 0
while (size < secondTierSize + firstTierSize) {
val segment = SegmentPool.take()
size += segment.dataAsByteArray(false).size
segments.add(segment)
}
segments.forEach(SegmentPool::recycle)


// take the same number of segments again and check that nothing new was allocated
val segments2 = mutableSetOf<Segment>()
size = 0
while (size < secondTierSize + firstTierSize) {
val segment = SegmentPool.take()
size += segment.remainingCapacity
segments2.add(segment)
}
segments2.forEach(SegmentPool::recycle)

assertEquals(segments, segments2)
}

@Test
fun contendedUseTest() {
val threadsCount = Runtime.getRuntime().availableProcessors() * 3
assertTrue(threadsCount >= SegmentPool.HASH_BUCKET_COUNT)

val segmentsPerThread = SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE / Segment.SIZE / threadsCount
val observedSegments = ConcurrentHashMap<Segment, Any>()

val pool = Executors.newFixedThreadPool(threadsCount)
repeat(threadsCount) {
pool.submit {
repeat(10000) {
val segments = mutableListOf<Segment>()
repeat(segmentsPerThread) {
val seg = SegmentPool.take()
segments.add(seg)
observedSegments[seg] = seg
}
segments.forEach { SegmentPool.recycle(it) }
}
}
}
pool.shutdown()
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)

val maxPoolSize =
(SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE + SegmentPool.MAX_SIZE * SegmentPool.HASH_BUCKET_COUNT) / Segment.SIZE
assertTrue(observedSegments.size <= maxPoolSize)
}

@Test
fun contendedUseWithMixedOperationsTest() {
val threadsCount = Runtime.getRuntime().availableProcessors() * 3
assertTrue(threadsCount >= SegmentPool.HASH_BUCKET_COUNT)

val segmentsPerThread = SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE / Segment.SIZE / threadsCount
val observedSegments = ConcurrentHashMap<Segment, Any>()

val pool = Executors.newFixedThreadPool(threadsCount)
repeat(threadsCount) {
pool.submit {
repeat(10000) {
val segments = mutableListOf<Segment>()
repeat(segmentsPerThread * 2) {
when (segments.size) {
0 -> {
val seg = SegmentPool.take()
segments.add(seg)
observedSegments[seg] = seg
}
segmentsPerThread -> SegmentPool.recycle(segments.removeLast())
else -> {
val rnd = Random.nextDouble()
// More segments we have, higher the probability to return one of them back
if (rnd > segments.size.toDouble() / segmentsPerThread) {
SegmentPool.recycle(segments.removeLast())
} else {
val seg = SegmentPool.take()
segments.add(seg)
observedSegments[seg] = seg
}
}
}

}
segments.forEach { SegmentPool.recycle(it) }
}
}
}
pool.shutdown()
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)

val maxPoolSize =
(SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE + SegmentPool.MAX_SIZE * SegmentPool.HASH_BUCKET_COUNT) / Segment.SIZE
assertTrue(observedSegments.size <= maxPoolSize)
}
}