Skip to content

Commit fe2865b

Browse files
authored
Improve segment pool test coverage (#379)
* Improve segment pool test coverage * Add concurrent stress tests
1 parent 9db9bd6 commit fe2865b

File tree

2 files changed

+117
-2
lines changed

2 files changed

+117
-2
lines changed

core/jvm/src/SegmentPool.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,10 @@ internal actual object SegmentPool {
111111
* The number of hash buckets. This number needs to balance keeping the pool small and contention
112112
* low. We use the number of processors rounded up to the nearest power of two. For example a
113113
* machine with 6 cores will have 8 hash buckets.
114+
*
115+
* Visible for testing only.
114116
*/
115-
private val HASH_BUCKET_COUNT =
117+
internal val HASH_BUCKET_COUNT =
116118
Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 2 - 1)
117119

118120
private val HASH_BUCKET_COUNT_L2 = (HASH_BUCKET_COUNT / 2).coerceAtLeast(1)
@@ -124,7 +126,8 @@ internal actual object SegmentPool {
124126
else -> "4194304" // 4MB
125127
}
126128

127-
private val SECOND_LEVEL_POOL_TOTAL_SIZE =
129+
// visible for testing
130+
internal val SECOND_LEVEL_POOL_TOTAL_SIZE =
128131
System.getProperty("kotlinx.io.pool.size.bytes", DEFAULT_SECOND_LEVEL_POOL_TOTAL_SIZE)
129132
.toIntOrNull()?.coerceAtLeast(0) ?: 0
130133

core/jvm/test/PoolingTest.kt

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55

66
package kotlinx.io
77

8+
import java.util.concurrent.ConcurrentHashMap
9+
import java.util.concurrent.Executors
10+
import java.util.concurrent.TimeUnit
11+
import kotlin.random.Random
812
import kotlin.test.Test
13+
import kotlin.test.assertEquals
914
import kotlin.test.assertTrue
1015

1116
class PoolingTest {
@@ -40,4 +45,111 @@ class PoolingTest {
4045
otherBuffer.clear()
4146
assertTrue(poolSize < SegmentPool.byteCount)
4247
}
48+
49+
@Test
50+
fun segmentAcquisitionAndRelease() {
51+
val secondTierSize = SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE
52+
val firstTierSize = SegmentPool.MAX_SIZE
53+
54+
// fill the pool by requiring max possible segments count and then
55+
// releasing them all
56+
val segments = mutableSetOf<Segment>()
57+
var size = 0
58+
while (size < secondTierSize + firstTierSize) {
59+
val segment = SegmentPool.take()
60+
size += segment.dataAsByteArray(false).size
61+
segments.add(segment)
62+
}
63+
segments.forEach(SegmentPool::recycle)
64+
65+
66+
// take the same number of segments again and check that nothing new was allocated
67+
val segments2 = mutableSetOf<Segment>()
68+
size = 0
69+
while (size < secondTierSize + firstTierSize) {
70+
val segment = SegmentPool.take()
71+
size += segment.remainingCapacity
72+
segments2.add(segment)
73+
}
74+
segments2.forEach(SegmentPool::recycle)
75+
76+
assertEquals(segments, segments2)
77+
}
78+
79+
@Test
80+
fun contendedUseTest() {
81+
val threadsCount = Runtime.getRuntime().availableProcessors() * 3
82+
assertTrue(threadsCount >= SegmentPool.HASH_BUCKET_COUNT)
83+
84+
val segmentsPerThread = SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE / Segment.SIZE / threadsCount
85+
val observedSegments = ConcurrentHashMap<Segment, Any>()
86+
87+
val pool = Executors.newFixedThreadPool(threadsCount)
88+
repeat(threadsCount) {
89+
pool.submit {
90+
repeat(10000) {
91+
val segments = mutableListOf<Segment>()
92+
repeat(segmentsPerThread) {
93+
val seg = SegmentPool.take()
94+
segments.add(seg)
95+
observedSegments[seg] = seg
96+
}
97+
segments.forEach { SegmentPool.recycle(it) }
98+
}
99+
}
100+
}
101+
pool.shutdown()
102+
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
103+
104+
val maxPoolSize =
105+
(SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE + SegmentPool.MAX_SIZE * SegmentPool.HASH_BUCKET_COUNT) / Segment.SIZE
106+
assertTrue(observedSegments.size <= maxPoolSize)
107+
}
108+
109+
@Test
110+
fun contendedUseWithMixedOperationsTest() {
111+
val threadsCount = Runtime.getRuntime().availableProcessors() * 3
112+
assertTrue(threadsCount >= SegmentPool.HASH_BUCKET_COUNT)
113+
114+
val segmentsPerThread = SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE / Segment.SIZE / threadsCount
115+
val observedSegments = ConcurrentHashMap<Segment, Any>()
116+
117+
val pool = Executors.newFixedThreadPool(threadsCount)
118+
repeat(threadsCount) {
119+
pool.submit {
120+
repeat(10000) {
121+
val segments = mutableListOf<Segment>()
122+
repeat(segmentsPerThread * 2) {
123+
when (segments.size) {
124+
0 -> {
125+
val seg = SegmentPool.take()
126+
segments.add(seg)
127+
observedSegments[seg] = seg
128+
}
129+
segmentsPerThread -> SegmentPool.recycle(segments.removeLast())
130+
else -> {
131+
val rnd = Random.nextDouble()
132+
// More segments we have, higher the probability to return one of them back
133+
if (rnd > segments.size.toDouble() / segmentsPerThread) {
134+
SegmentPool.recycle(segments.removeLast())
135+
} else {
136+
val seg = SegmentPool.take()
137+
segments.add(seg)
138+
observedSegments[seg] = seg
139+
}
140+
}
141+
}
142+
143+
}
144+
segments.forEach { SegmentPool.recycle(it) }
145+
}
146+
}
147+
}
148+
pool.shutdown()
149+
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
150+
151+
val maxPoolSize =
152+
(SegmentPool.SECOND_LEVEL_POOL_TOTAL_SIZE + SegmentPool.MAX_SIZE * SegmentPool.HASH_BUCKET_COUNT) / Segment.SIZE
153+
assertTrue(observedSegments.size <= maxPoolSize)
154+
}
43155
}

0 commit comments

Comments
 (0)