Skip to content

fix: avoid unbalanced session pool creation #2442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
bb09e1b
fix: avoid unbalanced session pool creation
olavloite May 19, 2023
c2ef5b1
fix: automatically balance pool
olavloite May 19, 2023
8a44f2b
fix: skip empty pool
olavloite May 19, 2023
75773fd
fix: shuffle if unbalanced
olavloite May 19, 2023
913fa8a
fix: only reset randomness if actually randomized
olavloite May 20, 2023
4130c8c
test: randomize if many sessions are checked out
olavloite May 20, 2023
6c48990
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 20, 2023
f8637bb
test: try with more channels
olavloite May 22, 2023
be9fca2
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 22, 2023
9be0abe
Merge branch 'main' into randomize-session-position-at-first-add
olavloite May 31, 2023
f260eca
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 31, 2023
8939c7e
fix: also consider checked out sessions for unbalanced pool
olavloite Jun 1, 2023
80ded1b
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 1, 2023
8a84488
Merge branch 'main' into randomize-session-position-at-first-add
olavloite Jul 31, 2023
8346c5c
docs: add javadoc for property
olavloite Jul 31, 2023
7a6e536
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 31, 2023
5e3d68d
perf: optimize low-QPS workloads
olavloite Aug 1, 2023
ed25c99
test: only randomize if more than 2 sessions are checked out
olavloite Aug 2, 2023
bc406f4
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 2, 2023
104b36e
test: only skip randomization for existing sessions
olavloite Aug 2, 2023
f5095f3
Merge branch 'randomize-session-position-at-first-add' of github.com:…
olavloite Aug 2, 2023
6885745
Merge branch 'main' into randomize-session-position-at-first-add
olavloite Aug 29, 2023
86909e3
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 29, 2023
8e7ea3a
chore: run formatter
olavloite Aug 29, 2023
c6b1542
Merge branch 'randomize-session-position-at-first-add' of github.com:…
olavloite Aug 29, 2023
30d7edc
chore: address review comments
olavloite Sep 7, 2023
51320ab
Merge branch 'main' into randomize-session-position-at-first-add
olavloite Sep 7, 2023
3c6060a
docs: update comment on how session is added to the pool
olavloite Sep 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -1372,6 +1373,13 @@ final class PooledSession implements Session {
private volatile SpannerException lastException;
private volatile boolean allowReplacing = true;

/**
* This ensures that the session is added at a random position in the pool the first time it is
* actually added to the pool.
*/
@GuardedBy("lock")
private Position releaseToPosition = initialReleasePosition;

/**
* Property to mark if the session is eligible to be long-running. This can only be true if the
* session is executing certain types of transactions (for ex - Partitioned DML) which can be
Expand Down Expand Up @@ -1403,6 +1411,13 @@ private PooledSession(SessionImpl delegate) {
this.lastUseTime = clock.instant();
}

int getChannel() {
Long channelHint = (Long) delegate.getOptions().get(SpannerRpc.Option.CHANNEL_HINT);
return channelHint == null
? 0
: (int) (channelHint % sessionClient.getSpanner().getOptions().getNumChannels());
}

@Override
public String toString() {
return getName();
Expand Down Expand Up @@ -1536,7 +1551,7 @@ public void close() {
if (state != SessionState.CLOSING) {
state = SessionState.AVAILABLE;
}
releaseSession(this, Position.FIRST);
releaseSession(this, false);
}
}

Expand Down Expand Up @@ -1576,7 +1591,7 @@ private void determineDialectAsync(final SettableFuture<Dialect> dialect) {
// in the database dialect, and there's nothing sensible that we can do with it here.
dialect.setException(t);
} finally {
releaseSession(this, Position.FIRST);
releaseSession(this, false);
}
});
}
Expand Down Expand Up @@ -1830,7 +1845,7 @@ private void keepAliveSessions(Instant currTime) {
logger.log(Level.FINE, "Keeping alive session " + sessionToKeepAlive.getName());
numSessionsToKeepAlive--;
sessionToKeepAlive.keepAlive();
releaseSession(sessionToKeepAlive, Position.FIRST);
releaseSession(sessionToKeepAlive, false);
} catch (SpannerException e) {
handleException(e, sessionToKeepAlive);
}
Expand Down Expand Up @@ -1929,7 +1944,7 @@ private void removeLongRunningSessions(
}
}

private enum Position {
enum Position {
FIRST,
RANDOM
}
Expand Down Expand Up @@ -1962,6 +1977,15 @@ private enum Position {

final PoolMaintainer poolMaintainer;
private final Clock clock;
/**
* initialReleasePosition determines where in the pool sessions are added when they are released
* into the pool the first time. This is always RANDOM in production, but some tests use FIRST to
* be able to verify the order of sessions in the pool. Using RANDOM ensures that we do not get an
* unbalanced session pool where all sessions belonging to one gRPC channel are added to the same
* region in the pool.
*/
private final Position initialReleasePosition;

private final Object lock = new Object();
private final Random random = new Random();

Expand Down Expand Up @@ -2045,6 +2069,7 @@ static SessionPool createPool(
((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(),
sessionClient,
poolMaintainerClock == null ? new Clock() : poolMaintainerClock,
Position.RANDOM,
Metrics.getMetricRegistry(),
labelValues);
}
Expand All @@ -2053,20 +2078,22 @@ static SessionPool createPool(
SessionPoolOptions poolOptions,
ExecutorFactory<ScheduledExecutorService> executorFactory,
SessionClient sessionClient) {
return createPool(poolOptions, executorFactory, sessionClient, new Clock());
return createPool(poolOptions, executorFactory, sessionClient, new Clock(), Position.RANDOM);
}

static SessionPool createPool(
SessionPoolOptions poolOptions,
ExecutorFactory<ScheduledExecutorService> executorFactory,
SessionClient sessionClient,
Clock clock) {
Clock clock,
Position initialReleasePosition) {
return createPool(
poolOptions,
null,
executorFactory,
sessionClient,
clock,
initialReleasePosition,
Metrics.getMetricRegistry(),
SPANNER_DEFAULT_LABEL_VALUES);
}
Expand All @@ -2077,6 +2104,7 @@ static SessionPool createPool(
ExecutorFactory<ScheduledExecutorService> executorFactory,
SessionClient sessionClient,
Clock clock,
Position initialReleasePosition,
MetricRegistry metricRegistry,
List<LabelValue> labelValues) {
SessionPool pool =
Expand All @@ -2087,6 +2115,7 @@ static SessionPool createPool(
executorFactory.get(),
sessionClient,
clock,
initialReleasePosition,
metricRegistry,
labelValues);
pool.initPool();
Expand All @@ -2100,6 +2129,7 @@ private SessionPool(
ScheduledExecutorService executor,
SessionClient sessionClient,
Clock clock,
Position initialReleasePosition,
MetricRegistry metricRegistry,
List<LabelValue> labelValues) {
this.options = options;
Expand All @@ -2108,6 +2138,7 @@ private SessionPool(
this.executor = executor;
this.sessionClient = sessionClient;
this.clock = clock;
this.initialReleasePosition = initialReleasePosition;
this.poolMaintainer = new PoolMaintainer();
this.initMetricsCollection(metricRegistry, labelValues);
this.waitOnMinSessionsLatch =
Expand Down Expand Up @@ -2233,7 +2264,7 @@ private void handleException(SpannerException e, PooledSession session) {
if (isSessionNotFound(e)) {
invalidateSession(session);
} else {
releaseSession(session, Position.FIRST);
releaseSession(session, false);
}
}

Expand Down Expand Up @@ -2396,18 +2427,35 @@ private void maybeCreateSession() {
}
}
}

/** Releases a session back to the pool. This might cause one of the waiters to be unblocked. */
private void releaseSession(PooledSession session, Position position) {
private void releaseSession(PooledSession session, boolean isNewSession) {
Preconditions.checkNotNull(session);
synchronized (lock) {
if (closureFuture != null) {
return;
}
if (waiters.size() == 0) {
// No pending waiters
switch (position) {
// There are no pending waiters.
// Add to a random position if the head of the session pool already contains many sessions
// with the same channel as this one.
if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) {
session.releaseToPosition = Position.RANDOM;
} else if (session.releaseToPosition == Position.RANDOM
&& !isNewSession
&& checkedOutSessions.size() <= 2) {
// Do not randomize if there are few other sessions checked out and this session has been
// used. This ensures that this session will be re-used for the next transaction, which is
// more efficient.
session.releaseToPosition = Position.FIRST;
}
switch (session.releaseToPosition) {
case RANDOM:
if (!sessions.isEmpty()) {
// A session should only be added at a random position the first time it is added to
// the pool. All following releases into the pool should happen at the front of the
// pool.
session.releaseToPosition = Position.FIRST;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit hacky that we are mutating session.releaseToPosition within the switch case. Instead of doing this within the switch case, should we add this condition as a branch at L2451?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that adding it as a branch at L2451 would work. The basic logic is:

  1. The session should normally only be added to the pool at a random position once.
  2. So we need an if/switch case that determines whether it is currently RANDOM, and if it is, then it should be reset to FIRST so it is not randomized the next time.

I've changed the switch case to an if-else, as we currently only support FIRST and RANDOM, so a switch does not really make sense. I think that makes the logic a bit clearer as well.

int pos = random.nextInt(sessions.size() + 1);
sessions.add(pos, session);
break;
Expand All @@ -2423,6 +2471,48 @@ private void releaseSession(PooledSession session, Position position) {
}
}

private boolean isUnbalanced(PooledSession session) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add @VisibleForTesting and add a few more unit tests for this method? It's doing a bunch of things which is currently untested with the existing tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. I've changed it a bit so we have a static method that does the actual calculation. That method is easier to test than one that depends on all the state fields in the SessionPool class. And I've added different tests for that method.

// Don't bother with any randomization if the number of checked out sessions is low, as it is
// better to re-use sessions as much as possible in a low-QPS scenario.
if (sessions.isEmpty() || checkedOutSessions.size() <= 2) {
return false;
}
int numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
if (numChannels == 1) {
return false;
}

// Ideally, the first numChannels sessions in the pool should contain exactly one session for
// each channel.
// Check if the first numChannels sessions at the head of the pool already contain more than 2
// sessions that use the same channel as this one. If so, we randomize.
int channel = session.getChannel();
int count = 0;
for (int i = 0; i < Math.min(numChannels, sessions.size()); i++) {
PooledSession otherSession = sessions.get(i);
if (channel == otherSession.getChannel()) {
count++;
if (count > 2) {
return true;
}
}
}
// Ideally, the use of a channel in the checked out sessions is exactly
// numCheckedOut / numChannels
// We check whether we are more than a factor two away from that perfect distribution.
// If we are, then we randomize.
int checkedOutThreshold = Math.max(2, 2 * checkedOutSessions.size() / numChannels);
for (PooledSessionFuture otherSession : checkedOutSessions) {
if (otherSession.isDone() && channel == otherSession.get().getChannel()) {
count++;
if (count > checkedOutThreshold) {
return true;
}
}
}
return false;
}

private void handleCreateSessionsFailure(SpannerException e, int count) {
synchronized (lock) {
for (int i = 0; i < count; i++) {
Expand Down Expand Up @@ -2622,7 +2712,7 @@ public void onSessionReady(SessionImpl session) {
// Release the session to a random position in the pool to prevent the case that a batch
// of sessions that are affiliated with the same channel are all placed sequentially in
// the pool.
releaseSession(pooledSession, Position.RANDOM);
releaseSession(pooledSession, true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.protobuf.Empty;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.threeten.bp.Instant;

abstract class BaseSessionPoolTest {
ScheduledExecutorService mockExecutor;
int sessionIndex;
AtomicLong channelHint = new AtomicLong(0L);

final class TestExecutorFactory implements ExecutorFactory<ScheduledExecutorService> {

Expand Down Expand Up @@ -64,6 +69,9 @@ public void release(ScheduledExecutorService executor) {
@SuppressWarnings("unchecked")
SessionImpl mockSession() {
final SessionImpl session = mock(SessionImpl.class);
Map options = new HashMap<>();
options.put(Option.CHANNEL_HINT, channelHint.getAndIncrement());
when(session.getOptions()).thenReturn(options);
when(session.getName())
.thenReturn(
"projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SessionPool.Position;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -58,6 +59,7 @@ public void setUp() {
initMocks(this);
when(client.getOptions()).thenReturn(spannerOptions);
when(client.getSessionClient(db)).thenReturn(sessionClient);
when(sessionClient.getSpanner()).thenReturn(client);
when(spannerOptions.getNumChannels()).thenReturn(4);
when(spannerOptions.getDatabaseRole()).thenReturn("role");
setupMockSessionCreation();
Expand Down Expand Up @@ -111,9 +113,11 @@ private SessionImpl setupMockSession(final SessionImpl session) {
}

private SessionPool createPool() throws Exception {
// Allow sessions to be added to the head of the pool in all cases in this test, as it is
// otherwise impossible to know which session exactly is getting pinged at what point in time.
SessionPool pool =
SessionPool.createPool(
options, new TestExecutorFactory(), client.getSessionClient(db), clock);
options, new TestExecutorFactory(), client.getSessionClient(db), clock, Position.FIRST);
pool.idleSessionRemovedListener =
input -> {
idledSessions.add(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SessionPool.Position;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.cloud.spanner.SessionPoolOptions.ActionOnInactiveTransaction;
import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions;
Expand Down Expand Up @@ -92,6 +93,7 @@ private void setupSpanner(DatabaseId db) {
when(spannerOptions.getNumChannels()).thenReturn(4);
when(spannerOptions.getDatabaseRole()).thenReturn("role");
SessionClient sessionClient = mock(SessionClient.class);
when(sessionClient.getSpanner()).thenReturn(mockSpanner);
when(mockSpanner.getSessionClient(db)).thenReturn(sessionClient);
when(mockSpanner.getOptions()).thenReturn(spannerOptions);
doAnswer(
Expand Down Expand Up @@ -226,7 +228,11 @@ public void stressTest() throws Exception {
}
pool =
SessionPool.createPool(
builder.build(), new TestExecutorFactory(), mockSpanner.getSessionClient(db), clock);
builder.build(),
new TestExecutorFactory(),
mockSpanner.getSessionClient(db),
clock,
Position.RANDOM);
pool.idleSessionRemovedListener =
pooled -> {
String name = pooled.getName();
Expand Down
Loading