18
18
19
19
import static com .google .cloud .spanner .MetricRegistryConstants .COUNT ;
20
20
import static com .google .cloud .spanner .MetricRegistryConstants .GET_SESSION_TIMEOUTS ;
21
+ import static com .google .cloud .spanner .MetricRegistryConstants .IS_MULTIPLEXED ;
21
22
import static com .google .cloud .spanner .MetricRegistryConstants .MAX_ALLOWED_SESSIONS ;
22
23
import static com .google .cloud .spanner .MetricRegistryConstants .MAX_ALLOWED_SESSIONS_DESCRIPTION ;
23
24
import static com .google .cloud .spanner .MetricRegistryConstants .MAX_IN_USE_SESSIONS ;
@@ -1402,7 +1403,7 @@ PooledSession get(final boolean eligibleForLongRunning) {
1402
1403
res .markBusy (span );
1403
1404
span .addAnnotation ("Using Session" , "sessionId" , res .getName ());
1404
1405
synchronized (lock ) {
1405
- incrementNumSessionsInUse ();
1406
+ incrementNumSessionsInUse (false );
1406
1407
checkedOutSessions .add (this );
1407
1408
}
1408
1409
res .eligibleForLongRunning = eligibleForLongRunning ;
@@ -2162,9 +2163,15 @@ enum Position {
2162
2163
@ GuardedBy ("lock" )
2163
2164
private long numSessionsAcquired = 0 ;
2164
2165
2166
+ @ GuardedBy ("lock" )
2167
+ private long numMultiplexedSessionsAcquired = 0 ;
2168
+
2165
2169
@ GuardedBy ("lock" )
2166
2170
private long numSessionsReleased = 0 ;
2167
2171
2172
+ @ GuardedBy ("lock" )
2173
+ private long numMultiplexedSessionsReleased = 0 ;
2174
+
2168
2175
@ GuardedBy ("lock" )
2169
2176
private long numIdleSessionsRemoved = 0 ;
2170
2177
@@ -2175,6 +2182,7 @@ enum Position {
2175
2182
private long numLeakedSessionsRemoved = 0 ;
2176
2183
2177
2184
private AtomicLong numWaiterTimeouts = new AtomicLong ();
2185
+ private AtomicLong numMultiplexedSessionWaiterTimeouts = new AtomicLong ();
2178
2186
2179
2187
@ GuardedBy ("lock" )
2180
2188
private final Set <PooledSession > allSessions = new HashSet <>();
@@ -2184,15 +2192,12 @@ enum Position {
2184
2192
final Set <PooledSessionFuture > checkedOutSessions = new HashSet <>();
2185
2193
2186
2194
private final SessionConsumer sessionConsumer = new SessionConsumerImpl ();
2187
-
2188
2195
@ VisibleForTesting Function <PooledSession , Void > idleSessionRemovedListener ;
2189
2196
2190
2197
@ VisibleForTesting Function <PooledSession , Void > longRunningSessionRemovedListener ;
2191
-
2192
2198
private final CountDownLatch waitOnMinSessionsLatch ;
2193
2199
private final SessionReplacementHandler pooledSessionReplacementHandler =
2194
2200
new PooledSessionReplacementHandler ();
2195
-
2196
2201
/**
2197
2202
* Create a session pool with the given options and for the given database. It will also start
2198
2203
* eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0.
@@ -2424,6 +2429,10 @@ long getNumWaiterTimeouts() {
2424
2429
return numWaiterTimeouts .get ();
2425
2430
}
2426
2431
2432
+ long getNumMultiplexedSessionWaiterTimeouts () {
2433
+ return numMultiplexedSessionWaiterTimeouts .get ();
2434
+ }
2435
+
2427
2436
private void initPool () {
2428
2437
synchronized (lock ) {
2429
2438
poolMaintainer .init ();
@@ -2557,12 +2566,16 @@ private PooledSessionFuture checkoutSession(
2557
2566
return res ;
2558
2567
}
2559
2568
2560
- private void incrementNumSessionsInUse () {
2569
+ private void incrementNumSessionsInUse (boolean isMultiplexed ) {
2561
2570
synchronized (lock ) {
2562
- if (maxSessionsInUse < ++numSessionsInUse ) {
2563
- maxSessionsInUse = numSessionsInUse ;
2571
+ if (!isMultiplexed ) {
2572
+ if (maxSessionsInUse < ++numSessionsInUse ) {
2573
+ maxSessionsInUse = numSessionsInUse ;
2574
+ }
2575
+ numSessionsAcquired ++;
2576
+ } else {
2577
+ numMultiplexedSessionsAcquired ++;
2564
2578
}
2565
- numSessionsAcquired ++;
2566
2579
}
2567
2580
}
2568
2581
@@ -3139,13 +3152,25 @@ private void initOpenTelemetryMetricsCollection(
3139
3152
measurement .record (this .sessions .size (), attributesAvailableSessions );
3140
3153
});
3141
3154
3155
+ AttributesBuilder attributesBuilderIsMultiplexed ;
3156
+ if (attributes != null ) {
3157
+ attributesBuilderIsMultiplexed = attributes .toBuilder ();
3158
+ } else {
3159
+ attributesBuilderIsMultiplexed = Attributes .builder ();
3160
+ }
3161
+ Attributes attributesRegularSession =
3162
+ attributesBuilderIsMultiplexed .put (IS_MULTIPLEXED , false ).build ();
3163
+ Attributes attributesMultiplexedSession =
3164
+ attributesBuilderIsMultiplexed .put (IS_MULTIPLEXED , true ).build ();
3142
3165
meter
3143
3166
.counterBuilder (GET_SESSION_TIMEOUTS )
3144
3167
.setDescription (SESSIONS_TIMEOUTS_DESCRIPTION )
3145
3168
.setUnit (COUNT )
3146
3169
.buildWithCallback (
3147
3170
measurement -> {
3148
- measurement .record (this .getNumWaiterTimeouts (), attributes );
3171
+ measurement .record (this .getNumWaiterTimeouts (), attributesRegularSession );
3172
+ measurement .record (
3173
+ this .getNumMultiplexedSessionWaiterTimeouts (), attributesMultiplexedSession );
3149
3174
});
3150
3175
3151
3176
meter
@@ -3154,7 +3179,8 @@ private void initOpenTelemetryMetricsCollection(
3154
3179
.setUnit (COUNT )
3155
3180
.buildWithCallback (
3156
3181
measurement -> {
3157
- measurement .record (this .numSessionsAcquired , attributes );
3182
+ measurement .record (this .numSessionsAcquired , attributesRegularSession );
3183
+ measurement .record (this .numMultiplexedSessionsAcquired , attributesMultiplexedSession );
3158
3184
});
3159
3185
3160
3186
meter
@@ -3163,7 +3189,8 @@ private void initOpenTelemetryMetricsCollection(
3163
3189
.setUnit (COUNT )
3164
3190
.buildWithCallback (
3165
3191
measurement -> {
3166
- measurement .record (this .numSessionsReleased , attributes );
3192
+ measurement .record (this .numSessionsReleased , attributesRegularSession );
3193
+ measurement .record (this .numMultiplexedSessionsReleased , attributesMultiplexedSession );
3167
3194
});
3168
3195
}
3169
3196
}
0 commit comments