Skip to content

Commit cabb263

Browse files
committed
Moved jitter to the cached supplier and addressed other comments.
1 parent 5a1b120 commit cabb263

File tree

3 files changed

+98
-63
lines changed

3 files changed

+98
-63
lines changed

utils/src/main/java/software/amazon/awssdk/utils/cache/CachedSupplier.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@
1515

1616
package software.amazon.awssdk.utils.cache;
1717

18+
import static java.time.temporal.ChronoUnit.MINUTES;
19+
1820
import java.time.Duration;
1921
import java.time.Instant;
22+
import java.util.Random;
2023
import java.util.concurrent.TimeUnit;
2124
import java.util.concurrent.atomic.AtomicBoolean;
2225
import java.util.concurrent.locks.Lock;
2326
import java.util.concurrent.locks.ReentrantLock;
2427
import java.util.function.Supplier;
2528
import software.amazon.awssdk.annotations.SdkProtectedApi;
29+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
30+
import software.amazon.awssdk.utils.ComparableUtils;
2631
import software.amazon.awssdk.utils.SdkAutoCloseable;
2732
import software.amazon.awssdk.utils.Validate;
2833

@@ -44,6 +49,16 @@ public class CachedSupplier<T> implements Supplier<T>, SdkAutoCloseable {
4449
*/
4550
private static final Duration BLOCKING_REFRESH_MAX_WAIT = Duration.ofSeconds(5);
4651

52+
/**
53+
* Maximum amount of jitter to apply to the refresh result's prefetch time.
54+
*/
55+
private static final Duration MAX_PREFETCH_JITTER = Duration.ofMinutes(5);
56+
57+
/**
58+
* Random instance used for jittering refresh results.
59+
*/
60+
private static final Random JITTER_RANDOM = new Random();
61+
4762
/**
4863
* Used as a primitive form of rate limiting for the speed of our refreshes. This will make sure that the backing supplier has
4964
* a period of time to update the value when the {@link RefreshResult#staleTime()} arrives without getting called by every
@@ -62,6 +77,11 @@ public class CachedSupplier<T> implements Supplier<T>, SdkAutoCloseable {
6277
*/
6378
private final AtomicBoolean prefetchStrategyInitialized = new AtomicBoolean(false);
6479

80+
/**
81+
* The maximum amount of prefetch jitter allowed on this instance. Only differs for {@link #MAX_PREFETCH_JITTER} in testing.
82+
*/
83+
private final Duration maxPrefetchJitter;
84+
6585
/**
6686
* The value currently stored in this cache.
6787
*/
@@ -76,8 +96,9 @@ public class CachedSupplier<T> implements Supplier<T>, SdkAutoCloseable {
7696
private final Supplier<RefreshResult<T>> valueSupplier;
7797

7898
private CachedSupplier(Builder<T> builder) {
79-
this.valueSupplier = Validate.notNull(builder.supplier, "builder.supplier");
99+
this.valueSupplier = jitteredValueSupplier(Validate.notNull(builder.supplier, "builder.supplier"));
80100
this.prefetchStrategy = Validate.notNull(builder.prefetchStrategy, "builder.prefetchStrategy");
101+
this.maxPrefetchJitter = Validate.notNull(builder.maxPrefetchJitter, "builder.maxPrefetchJitter");
81102
}
82103

83104
/**
@@ -163,6 +184,45 @@ private void handleInterruptedException(String message, InterruptedException cau
163184
throw new IllegalStateException(message, cause);
164185
}
165186

187+
/**
188+
* Wrap a value supplier with one that jitters its prefetch time.
189+
*/
190+
private Supplier<RefreshResult<T>> jitteredValueSupplier(Supplier<RefreshResult<T>> supplier) {
191+
return () -> {
192+
RefreshResult<T> result = supplier.get();
193+
194+
if (result.prefetchTime() == null) {
195+
return result;
196+
}
197+
198+
Duration maxJitter = getMaxJitter(result);
199+
if (maxJitter.isZero()) {
200+
return result;
201+
}
202+
203+
long jitter = Math.abs(JITTER_RANDOM.nextLong() % maxJitter.toMillis());
204+
Instant newPrefetchTime = result.prefetchTime().plusMillis(jitter);
205+
return RefreshResult.builder(result.value())
206+
.prefetchTime(newPrefetchTime)
207+
.staleTime(result.staleTime())
208+
.build();
209+
};
210+
}
211+
212+
private Duration getMaxJitter(RefreshResult<T> result) {
213+
if (result.staleTime() == null) {
214+
return maxPrefetchJitter;
215+
}
216+
217+
Instant oneMinuteBeforeStale = result.staleTime().minus(1, MINUTES);
218+
if (!result.prefetchTime().isBefore(oneMinuteBeforeStale)) {
219+
return Duration.ZERO;
220+
}
221+
222+
Duration oneMinuteBeforeStaleDuration = Duration.between(result.prefetchTime(), oneMinuteBeforeStale);
223+
return ComparableUtils.minimum(oneMinuteBeforeStaleDuration, maxPrefetchJitter);
224+
}
225+
166226
/**
167227
* Free any resources consumed by the prefetch strategy this supplier is using.
168228
*/
@@ -177,6 +237,7 @@ public void close() {
177237
public static final class Builder<T> {
178238
private final Supplier<RefreshResult<T>> supplier;
179239
private PrefetchStrategy prefetchStrategy = new OneCallerBlocks();
240+
private Duration maxPrefetchJitter = MAX_PREFETCH_JITTER;
180241

181242
private Builder(Supplier<RefreshResult<T>> supplier) {
182243
this.supplier = supplier;
@@ -194,6 +255,15 @@ public Builder<T> prefetchStrategy(PrefetchStrategy prefetchStrategy) {
194255
return this;
195256
}
196257

258+
/**
259+
* The maximum amount of time the prefetch time from the configured supplier will be jittered.
260+
*/
261+
@SdkTestInternalApi
262+
Builder<T> maxPrefetchJitter(Duration maxPrefetchJitter) {
263+
this.maxPrefetchJitter = maxPrefetchJitter;
264+
return this;
265+
}
266+
197267
/**
198268
* Create a {@link CachedSupplier} using the current configuration of this builder.
199269
*/

utils/src/main/java/software/amazon/awssdk/utils/cache/NonBlocking.java

Lines changed: 15 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,8 @@
1515

1616
package software.amazon.awssdk.utils.cache;
1717

18-
import static java.time.temporal.ChronoUnit.HOURS;
19-
import static java.time.temporal.ChronoUnit.MINUTES;
20-
2118
import java.time.Duration;
2219
import java.time.Instant;
23-
import java.util.Optional;
24-
import java.util.Random;
2520
import java.util.concurrent.ScheduledFuture;
2621
import java.util.concurrent.ScheduledThreadPoolExecutor;
2722
import java.util.concurrent.Semaphore;
@@ -57,11 +52,6 @@ public class NonBlocking implements CachedSupplier.PrefetchStrategy {
5752
*/
5853
private static final Semaphore CONCURRENT_REFRESH_LEASES = new Semaphore(MAX_CONCURRENT_REFRESHES);
5954

60-
/**
61-
* The {@link Random} instance used for calculating jitter of the background prefetches.
62-
*/
63-
private static final Random JITTER_RANDOM = new Random();
64-
6555
/**
6656
* Thread used to kick off refreshes during the prefetch window. This does not do the actual refreshing. That's left for
6757
* the {@link #EXECUTOR}.
@@ -105,11 +95,6 @@ public class NonBlocking implements CachedSupplier.PrefetchStrategy {
10595
*/
10696
private final AtomicReference<ScheduledFuture<?>> refreshTask = new AtomicReference<>();
10797

108-
/**
109-
* The minimum amount of time allowed between async refreshes, primarily adjustable for testing purposes.
110-
*/
111-
private final Duration minimumRefreshFrequency;
112-
11398
/**
11499
* Whether this strategy has been shutdown (and should stop doing background refreshes)
115100
*/
@@ -130,13 +115,7 @@ public class NonBlocking implements CachedSupplier.PrefetchStrategy {
130115
* performing the update.
131116
*/
132117
public NonBlocking(String asyncThreadName) {
133-
this(asyncThreadName, Duration.ofSeconds(60));
134-
}
135-
136-
@SdkTestInternalApi
137-
NonBlocking(String asyncThreadName, Duration minimumRefreshFrequency) {
138118
this.asyncThreadName = asyncThreadName + "-" + INSTANCE_NUMBER.getAndIncrement();
139-
this.minimumRefreshFrequency = minimumRefreshFrequency;
140119
}
141120

142121
@SdkTestInternalApi
@@ -160,54 +139,36 @@ public void prefetch(Runnable valueUpdater) {
160139
@Override
161140
public <T> RefreshResult<T> fetch(Supplier<RefreshResult<T>> supplier) {
162141
RefreshResult<T> result = supplier.get();
163-
if (result.staleTime() == null || result.prefetchTime() == null) {
164-
return result;
165-
}
166-
167-
getRefreshTime(result).ifPresent(this::schedulePrefetch);
142+
schedulePrefetch(result);
168143
return result;
169144
}
170145

171-
private Optional<Instant> getRefreshTime(RefreshResult<?> result) {
172-
Instant minStart = Instant.now().plus(minimumRefreshFrequency);
173-
Instant rangeStart = result.prefetchTime().isBefore(minStart) ? minStart : result.prefetchTime();
174-
175-
if (Duration.between(Instant.now(), rangeStart).toDays() > 7) {
176-
log.debug(() -> "Skipping background refresh because the prefetch time is too far in the future: " + rangeStart);
177-
return Optional.empty();
178-
}
179-
180-
Instant maxEnd = rangeStart.plus(1, HOURS);
181-
Instant rangeEnd = result.staleTime().isAfter(maxEnd) ? maxEnd : result.staleTime().minus(1, MINUTES);
182-
183-
if (rangeEnd.isBefore(rangeStart)) {
184-
return Optional.of(rangeStart);
146+
private void schedulePrefetch(RefreshResult<?> result) {
147+
if (shutdown || result.staleTime() == null || result.prefetchTime() == null) {
148+
return;
185149
}
186150

187-
return Optional.of(randomTimeBetween(rangeStart, rangeEnd));
188-
}
189-
190-
private Instant randomTimeBetween(Instant rangeStart, Instant rangeEnd) {
191-
Duration timeBetween = Duration.between(rangeStart, rangeEnd);
192-
return rangeStart.plusMillis(Math.abs(JITTER_RANDOM.nextLong() % timeBetween.toMillis()));
193-
}
194-
195-
private void schedulePrefetch(Instant refreshTime) {
196-
if (shutdown) {
151+
Duration timeUntilPrefetch = Duration.between(Instant.now(), result.prefetchTime());
152+
if (timeUntilPrefetch.isNegative() || timeUntilPrefetch.toDays() > 7) {
153+
log.debug(() -> "Skipping background refresh because the prefetch time is in the past or too far in the future: " +
154+
result.prefetchTime());
197155
return;
198156
}
199157

200-
Duration waitTime = Duration.between(Instant.now(), refreshTime);
201-
log.debug(() -> "Scheduling refresh attempt for " + refreshTime + " (in " + waitTime.toMillis() + " ms)");
158+
Instant backgroundRefreshTime = result.prefetchTime().plusSeconds(1);
159+
Duration timeUntilBackgroundRefresh = timeUntilPrefetch.plusSeconds(1);
160+
161+
log.debug(() -> "Scheduling refresh attempt for " + backgroundRefreshTime + " (in " +
162+
timeUntilBackgroundRefresh.toMillis() + " ms)");
202163

203164
ScheduledFuture<?> scheduledTask = SCHEDULER.schedule(() -> {
204165
runWithInstanceThreadName(() -> {
205-
log.debug(() -> "Executing refresh attempt scheduled for " + refreshTime);
166+
log.debug(() -> "Executing refresh attempt scheduled for " + backgroundRefreshTime);
206167

207168
// If the supplier has already been prefetched, this will just be a cache hit.
208169
tryRunBackgroundTask(cachedSupplier::get);
209170
});
210-
}, waitTime.toMillis(), TimeUnit.MILLISECONDS);
171+
}, timeUntilBackgroundRefresh.toMillis(), TimeUnit.MILLISECONDS);
211172

212173
updateTask(scheduledTask);
213174

utils/src/test/java/software/amazon/awssdk/utils/cache/CachedSupplierTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ public void oneCallerBlocksPrefetchStrategyWorks() throws InterruptedException {
198198
try (WaitingSupplier waitingSupplier = new WaitingSupplier(future(), past())) {
199199
CachedSupplier<String> cachedSupplier = CachedSupplier.builder(waitingSupplier)
200200
.prefetchStrategy(new OneCallerBlocks())
201+
.maxPrefetchJitter(Duration.ZERO)
201202
.build();
202203

203204
// Perform one successful "get" to prime the cache.
@@ -225,6 +226,7 @@ public void nonBlockingPrefetchStrategyWorks() {
225226
try (WaitingSupplier waitingSupplier = new WaitingSupplier(future(), past());
226227
CachedSupplier<String> cachedSupplier = CachedSupplier.builder(waitingSupplier)
227228
.prefetchStrategy(new NonBlocking("test-%s"))
229+
.maxPrefetchJitter(Duration.ZERO)
228230
.build()) {
229231
// Perform one successful "get" to prime the cache.
230232
waitingSupplier.permits.release(1);
@@ -245,7 +247,8 @@ public void nonBlockingPrefetchStrategyWorks() {
245247
public void nonBlockingPrefetchStrategyRefreshesInBackground() {
246248
try (WaitingSupplier waitingSupplier = new WaitingSupplier(now().plusSeconds(62), now());
247249
CachedSupplier<String> cachedSupplier = CachedSupplier.builder(waitingSupplier)
248-
.prefetchStrategy(new NonBlocking("test-%s", Duration.ZERO))
250+
.prefetchStrategy(new NonBlocking("test-%s"))
251+
.maxPrefetchJitter(Duration.ZERO)
249252
.build()) {
250253
waitingSupplier.permits.release(2);
251254
cachedSupplier.get();
@@ -309,23 +312,23 @@ public void threadsAreSharedBetweenNonBlockingInstances() throws InterruptedExce
309312
CachedSupplier<String> supplier =
310313
CachedSupplier.builder(() -> RefreshResult.builder("foo")
311314
.prefetchTime(now())
312-
.staleTime(now())
315+
.staleTime(future())
313316
.build())
314-
.prefetchStrategy(new NonBlocking("test", Duration.ZERO))
317+
.prefetchStrategy(new NonBlocking("test"))
318+
.maxPrefetchJitter(Duration.ofMillis(10))
315319
.build();
316320
supplier.get();
317321
css.add(supplier);
318-
Thread.sleep(10);
319322
}
320323

321324
int maxActive = 0;
322-
for (int i = 0; i < 100; i++) {
325+
for (int i = 0; i < 1000; i++) {
323326
maxActive = Math.max(maxActive, NonBlocking.executor().getActiveCount());
324-
Thread.sleep(10);
327+
Thread.sleep(1);
325328
}
326329

327330
// Make sure we used less-than 99 to do the refreshes.
328-
assertThat(maxActive).isBetween(2, 99);
331+
assertThat(maxActive).isBetween(1, 99);
329332
} finally {
330333
css.forEach(CachedSupplier::close);
331334
}
@@ -345,7 +348,8 @@ public void activeThreadsHaveMaxCount() throws InterruptedException {
345348
.prefetchTime(now())
346349
.staleTime(now())
347350
.build();
348-
}).prefetchStrategy(new NonBlocking("test", Duration.ZERO))
351+
}).prefetchStrategy(new NonBlocking("test"))
352+
.maxPrefetchJitter(Duration.ZERO)
349353
.build();
350354
executor.submit(supplier::get);
351355
css.add(supplier);

0 commit comments

Comments
 (0)