-
Notifications
You must be signed in to change notification settings - Fork 132
fix: avoid unbalanced session pool creation #2442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 25 commits
bb09e1b
c2ef5b1
8a44f2b
75773fd
913fa8a
4130c8c
6c48990
f8637bb
be9fca2
9be0abe
f260eca
8939c7e
80ded1b
8a84488
8346c5c
7a6e536
5e3d68d
ed25c99
bc406f4
104b36e
f5095f3
6885745
86909e3
8e7ea3a
c6b1542
30d7edc
51320ab
3c6060a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ | |
import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions; | ||
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; | ||
import com.google.cloud.spanner.SpannerImpl.ClosedException; | ||
import com.google.cloud.spanner.spi.v1.SpannerRpc; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Function; | ||
import com.google.common.base.MoreObjects; | ||
|
@@ -1372,6 +1373,13 @@ final class PooledSession implements Session { | |
private volatile SpannerException lastException; | ||
private volatile boolean allowReplacing = true; | ||
|
||
/** | ||
* This ensures that the session is added at a random position in the pool the first time it is | ||
* actually added to the pool. | ||
*/ | ||
@GuardedBy("lock") | ||
private Position releaseToPosition = initialReleasePosition; | ||
|
||
/** | ||
* Property to mark if the session is eligible to be long-running. This can only be true if the | ||
* session is executing certain types of transactions (for ex - Partitioned DML) which can be | ||
|
@@ -1403,6 +1411,13 @@ private PooledSession(SessionImpl delegate) { | |
this.lastUseTime = clock.instant(); | ||
} | ||
|
||
int getChannel() { | ||
Long channelHint = (Long) delegate.getOptions().get(SpannerRpc.Option.CHANNEL_HINT); | ||
return channelHint == null | ||
? 0 | ||
: (int) (channelHint % sessionClient.getSpanner().getOptions().getNumChannels()); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return getName(); | ||
|
@@ -1536,7 +1551,7 @@ public void close() { | |
if (state != SessionState.CLOSING) { | ||
state = SessionState.AVAILABLE; | ||
} | ||
releaseSession(this, Position.FIRST); | ||
releaseSession(this, false); | ||
} | ||
} | ||
|
||
|
@@ -1576,7 +1591,7 @@ private void determineDialectAsync(final SettableFuture<Dialect> dialect) { | |
// in the database dialect, and there's nothing sensible that we can do with it here. | ||
dialect.setException(t); | ||
} finally { | ||
releaseSession(this, Position.FIRST); | ||
releaseSession(this, false); | ||
} | ||
}); | ||
} | ||
|
@@ -1830,7 +1845,7 @@ private void keepAliveSessions(Instant currTime) { | |
logger.log(Level.FINE, "Keeping alive session " + sessionToKeepAlive.getName()); | ||
numSessionsToKeepAlive--; | ||
sessionToKeepAlive.keepAlive(); | ||
releaseSession(sessionToKeepAlive, Position.FIRST); | ||
releaseSession(sessionToKeepAlive, false); | ||
} catch (SpannerException e) { | ||
handleException(e, sessionToKeepAlive); | ||
} | ||
|
@@ -1929,7 +1944,7 @@ private void removeLongRunningSessions( | |
} | ||
} | ||
|
||
private enum Position { | ||
enum Position { | ||
FIRST, | ||
RANDOM | ||
} | ||
|
@@ -1962,6 +1977,15 @@ private enum Position { | |
|
||
final PoolMaintainer poolMaintainer; | ||
private final Clock clock; | ||
/** | ||
* initialReleasePosition determines where in the pool sessions are added when they are released | ||
* into the pool the first time. This is always RANDOM in production, but some tests use FIRST to | ||
* be able to verify the order of sessions in the pool. Using RANDOM ensures that we do not get an | ||
* unbalanced session pool where all sessions belonging to one gRPC channel are added to the same | ||
* region in the pool. | ||
*/ | ||
private final Position initialReleasePosition; | ||
|
||
private final Object lock = new Object(); | ||
private final Random random = new Random(); | ||
|
||
|
@@ -2045,6 +2069,7 @@ static SessionPool createPool( | |
((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(), | ||
sessionClient, | ||
poolMaintainerClock == null ? new Clock() : poolMaintainerClock, | ||
Position.RANDOM, | ||
Metrics.getMetricRegistry(), | ||
labelValues); | ||
} | ||
|
@@ -2053,20 +2078,22 @@ static SessionPool createPool( | |
SessionPoolOptions poolOptions, | ||
ExecutorFactory<ScheduledExecutorService> executorFactory, | ||
SessionClient sessionClient) { | ||
return createPool(poolOptions, executorFactory, sessionClient, new Clock()); | ||
return createPool(poolOptions, executorFactory, sessionClient, new Clock(), Position.RANDOM); | ||
} | ||
|
||
static SessionPool createPool( | ||
SessionPoolOptions poolOptions, | ||
ExecutorFactory<ScheduledExecutorService> executorFactory, | ||
SessionClient sessionClient, | ||
Clock clock) { | ||
Clock clock, | ||
Position initialReleasePosition) { | ||
return createPool( | ||
poolOptions, | ||
null, | ||
executorFactory, | ||
sessionClient, | ||
clock, | ||
initialReleasePosition, | ||
Metrics.getMetricRegistry(), | ||
SPANNER_DEFAULT_LABEL_VALUES); | ||
} | ||
|
@@ -2077,6 +2104,7 @@ static SessionPool createPool( | |
ExecutorFactory<ScheduledExecutorService> executorFactory, | ||
SessionClient sessionClient, | ||
Clock clock, | ||
Position initialReleasePosition, | ||
MetricRegistry metricRegistry, | ||
List<LabelValue> labelValues) { | ||
SessionPool pool = | ||
|
@@ -2087,6 +2115,7 @@ static SessionPool createPool( | |
executorFactory.get(), | ||
sessionClient, | ||
clock, | ||
initialReleasePosition, | ||
metricRegistry, | ||
labelValues); | ||
pool.initPool(); | ||
|
@@ -2100,6 +2129,7 @@ private SessionPool( | |
ScheduledExecutorService executor, | ||
SessionClient sessionClient, | ||
Clock clock, | ||
Position initialReleasePosition, | ||
MetricRegistry metricRegistry, | ||
List<LabelValue> labelValues) { | ||
this.options = options; | ||
|
@@ -2108,6 +2138,7 @@ private SessionPool( | |
this.executor = executor; | ||
this.sessionClient = sessionClient; | ||
this.clock = clock; | ||
this.initialReleasePosition = initialReleasePosition; | ||
this.poolMaintainer = new PoolMaintainer(); | ||
this.initMetricsCollection(metricRegistry, labelValues); | ||
this.waitOnMinSessionsLatch = | ||
|
@@ -2233,7 +2264,7 @@ private void handleException(SpannerException e, PooledSession session) { | |
if (isSessionNotFound(e)) { | ||
invalidateSession(session); | ||
} else { | ||
releaseSession(session, Position.FIRST); | ||
releaseSession(session, false); | ||
} | ||
} | ||
|
||
|
@@ -2396,18 +2427,35 @@ private void maybeCreateSession() { | |
} | ||
} | ||
} | ||
|
||
/** Releases a session back to the pool. This might cause one of the waiters to be unblocked. */ | ||
private void releaseSession(PooledSession session, Position position) { | ||
private void releaseSession(PooledSession session, boolean isNewSession) { | ||
Preconditions.checkNotNull(session); | ||
synchronized (lock) { | ||
if (closureFuture != null) { | ||
return; | ||
} | ||
if (waiters.size() == 0) { | ||
// No pending waiters | ||
switch (position) { | ||
// There are no pending waiters. | ||
// Add to a random position if the head of the session pool already contains many sessions | ||
// with the same channel as this one. | ||
if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) { | ||
session.releaseToPosition = Position.RANDOM; | ||
} else if (session.releaseToPosition == Position.RANDOM | ||
&& !isNewSession | ||
&& checkedOutSessions.size() <= 2) { | ||
// Do not randomize if there are few other sessions checked out and this session has been | ||
// used. This ensures that this session will be re-used for the next transaction, which is | ||
// more efficient. | ||
session.releaseToPosition = Position.FIRST; | ||
} | ||
switch (session.releaseToPosition) { | ||
case RANDOM: | ||
if (!sessions.isEmpty()) { | ||
// A session should only be added at a random position the first time it is added to | ||
// the pool. All following releases into the pool should happen at the front of the | ||
// pool. | ||
session.releaseToPosition = Position.FIRST; | ||
int pos = random.nextInt(sessions.size() + 1); | ||
sessions.add(pos, session); | ||
break; | ||
|
@@ -2423,6 +2471,48 @@ private void releaseSession(PooledSession session, Position position) { | |
} | ||
} | ||
|
||
private boolean isUnbalanced(PooledSession session) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, good point. I've changed it a bit so we have a static method that does the actual calculation. That method is easier to test than one that depends on all the state fields in the SessionPool class. And I've added different tests for that method. |
||
// Don't bother with any randomization if the number of checked out sessions is low, as it is | ||
arpan14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// better to re-use sessions as much as possible in a low-QPS scenario. | ||
if (sessions.isEmpty() || checkedOutSessions.size() <= 2) { | ||
return false; | ||
} | ||
int numChannels = sessionClient.getSpanner().getOptions().getNumChannels(); | ||
arpan14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (numChannels == 1) { | ||
return false; | ||
} | ||
|
||
// Ideally, the first numChannels sessions in the pool should contain exactly one session for | ||
// each channel. | ||
// Check if the first numChannels sessions at the head of the pool already contain more than 2 | ||
// sessions that use the same channel as this one. If so, we randomize. | ||
int channel = session.getChannel(); | ||
int count = 0; | ||
for (int i = 0; i < Math.min(numChannels, sessions.size()); i++) { | ||
PooledSession otherSession = sessions.get(i); | ||
if (channel == otherSession.getChannel()) { | ||
count++; | ||
if (count > 2) { | ||
return true; | ||
} | ||
} | ||
} | ||
// Ideally, the use of a channel in the checked out sessions is exactly | ||
// numCheckedOut / numChannels | ||
// We check whether we are more than a factor two away from that perfect distribution. | ||
arpan14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// If we are, then we randomize. | ||
int checkedOutThreshold = Math.max(2, 2 * checkedOutSessions.size() / numChannels); | ||
for (PooledSessionFuture otherSession : checkedOutSessions) { | ||
if (otherSession.isDone() && channel == otherSession.get().getChannel()) { | ||
count++; | ||
if (count > checkedOutThreshold) { | ||
return true; | ||
} | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
private void handleCreateSessionsFailure(SpannerException e, int count) { | ||
synchronized (lock) { | ||
for (int i = 0; i < count; i++) { | ||
|
@@ -2622,7 +2712,7 @@ public void onSessionReady(SessionImpl session) { | |
// Release the session to a random position in the pool to prevent the case that a batch | ||
// of sessions that are affiliated with the same channel are all placed sequentially in | ||
// the pool. | ||
releaseSession(pooledSession, Position.RANDOM); | ||
releaseSession(pooledSession, true); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a bit hacky that we are mutating
session.releaseToPosition
within the switch case. Instead of doing this within the switch case, should we add this condition as a branch at L2451?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that adding it as a branch at L2451 would work. The basic logic is:
RANDOM
, and if it is, then it should be reset toFIRST
so it is not randomized the next time.I've changed the switch case to an if-else, as we currently only support
FIRST
andRANDOM
, so a switch does not really make sense. I think that makes the logic a bit clearer as well.