Skip to content

Commit 6cfa6fc

Browse files
Mike Pedersenzoewangg
andauthored
Added option of using an explicit ExecutorService in FileAsyncResponseTransformer (#3875)
Co-authored-by: Zoe Wang <[email protected]>
1 parent b57d499 commit 6cfa6fc

File tree

4 files changed

+179
-6
lines changed

4 files changed

+179
-6
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "mpdn",
5+
"description": "Added option of using an explicit `ExecutorService` in `FileAsyncResponseTransformer`"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/FileTransformerConfiguration.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515

1616
package software.amazon.awssdk.core;
1717

18+
import java.nio.channels.AsynchronousChannelGroup;
1819
import java.nio.file.FileAlreadyExistsException;
1920
import java.nio.file.Path;
21+
import java.util.Objects;
22+
import java.util.Optional;
23+
import java.util.concurrent.ExecutorService;
2024
import software.amazon.awssdk.annotations.SdkPublicApi;
2125
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2226
import software.amazon.awssdk.utils.Validate;
@@ -36,10 +40,12 @@ public final class FileTransformerConfiguration implements ToCopyableBuilder<Fil
3640
FileTransformerConfiguration> {
3741
private final FileWriteOption fileWriteOption;
3842
private final FailureBehavior failureBehavior;
43+
private final ExecutorService executorService;
3944

4045
private FileTransformerConfiguration(DefaultBuilder builder) {
4146
this.fileWriteOption = Validate.paramNotNull(builder.fileWriteOption, "fileWriteOption");
4247
this.failureBehavior = Validate.paramNotNull(builder.failureBehavior, "failureBehavior");
48+
this.executorService = builder.executorService;
4349
}
4450

4551
/**
@@ -56,6 +62,16 @@ public FailureBehavior failureBehavior() {
5662
return failureBehavior;
5763
}
5864

65+
/**
66+
* The configured {@link ExecutorService} the writes should be executed on.
67+
* <p>
68+
* If not set, the default thread pool defined by the underlying {@link java.nio.file.spi.FileSystemProvider} will be used.
69+
* This will typically be the thread pool defined by the {@link AsynchronousChannelGroup}.
70+
*/
71+
public Optional<ExecutorService> executorService() {
72+
return Optional.ofNullable(executorService);
73+
}
74+
5975
/**
6076
* Create a {@link Builder}, used to create a {@link FileTransformerConfiguration}.
6177
*/
@@ -118,13 +134,17 @@ public boolean equals(Object o) {
118134
if (fileWriteOption != that.fileWriteOption) {
119135
return false;
120136
}
121-
return failureBehavior == that.failureBehavior;
137+
if (failureBehavior != that.failureBehavior) {
138+
return false;
139+
}
140+
return Objects.equals(executorService, that.executorService);
122141
}
123142

124143
@Override
125144
public int hashCode() {
126145
int result = fileWriteOption != null ? fileWriteOption.hashCode() : 0;
127146
result = 31 * result + (failureBehavior != null ? failureBehavior.hashCode() : 0);
147+
result = 31 * result + (executorService != null ? executorService.hashCode() : 0);
128148
return result;
129149
}
130150

@@ -181,18 +201,28 @@ public interface Builder extends CopyableBuilder<Builder, FileTransformerConfigu
181201
* @return This object for method chaining.
182202
*/
183203
Builder failureBehavior(FailureBehavior failureBehavior);
204+
205+
/**
206+
* Configures the {@link ExecutorService} the writes should be executed on.
207+
*
208+
* @param executorService the executor service to use, or null if using the default thread pool.
209+
* @return This object for method chaining.
210+
*/
211+
Builder executorService(ExecutorService executorService);
184212
}
185213

186-
private static class DefaultBuilder implements Builder {
214+
private static final class DefaultBuilder implements Builder {
187215
private FileWriteOption fileWriteOption;
188216
private FailureBehavior failureBehavior;
217+
private ExecutorService executorService;
189218

190219
private DefaultBuilder() {
191220
}
192221

193222
private DefaultBuilder(FileTransformerConfiguration fileTransformerConfiguration) {
194223
this.fileWriteOption = fileTransformerConfiguration.fileWriteOption;
195224
this.failureBehavior = fileTransformerConfiguration.failureBehavior;
225+
this.executorService = fileTransformerConfiguration.executorService;
196226
}
197227

198228
@Override
@@ -207,6 +237,12 @@ public Builder failureBehavior(FailureBehavior failureBehavior) {
207237
return this;
208238
}
209239

240+
@Override
241+
public Builder executorService(ExecutorService executorService) {
242+
this.executorService = executorService;
243+
return this;
244+
}
245+
210246
@Override
211247
public FileTransformerConfiguration build() {
212248
return new FileTransformerConfiguration(this);

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@
2424
import java.nio.channels.CompletionHandler;
2525
import java.nio.file.Files;
2626
import java.nio.file.NoSuchFileException;
27+
import java.nio.file.OpenOption;
2728
import java.nio.file.Path;
2829
import java.nio.file.StandardOpenOption;
30+
import java.util.Collections;
31+
import java.util.HashSet;
32+
import java.util.Set;
2933
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.ExecutorService;
3035
import java.util.concurrent.atomic.AtomicLong;
3136
import java.util.function.Consumer;
3237
import org.reactivestreams.Subscriber;
@@ -78,17 +83,24 @@ private long determineFilePositionToWrite(Path path) {
7883
}
7984

8085
private AsynchronousFileChannel createChannel(Path path) throws IOException {
86+
Set<OpenOption> options = new HashSet<>();
8187
switch (configuration.fileWriteOption()) {
8288
case CREATE_OR_APPEND_TO_EXISTING:
83-
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
89+
Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
90+
break;
8491
case CREATE_OR_REPLACE_EXISTING:
85-
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE,
86-
StandardOpenOption.TRUNCATE_EXISTING);
92+
Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE,
93+
StandardOpenOption.TRUNCATE_EXISTING);
94+
break;
8795
case CREATE_NEW:
88-
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
96+
Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
97+
break;
8998
default:
9099
throw new IllegalArgumentException("Unsupported file write option: " + configuration.fileWriteOption());
91100
}
101+
102+
ExecutorService executorService = configuration.executorService().orElse(null);
103+
return AsynchronousFileChannel.open(path, options, executorService);
92104
}
93105

94106
@Override

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerTest.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,16 @@
3131
import java.nio.file.Path;
3232
import java.util.ArrayList;
3333
import java.util.Arrays;
34+
import java.util.Collection;
3435
import java.util.List;
36+
import java.util.concurrent.Callable;
3537
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.ExecutorService;
40+
import java.util.concurrent.Executors;
41+
import java.util.concurrent.Future;
3642
import java.util.concurrent.TimeUnit;
43+
import java.util.concurrent.TimeoutException;
3744
import org.apache.commons.lang3.RandomStringUtils;
3845
import org.junit.jupiter.api.AfterEach;
3946
import org.junit.jupiter.api.BeforeEach;
@@ -213,6 +220,32 @@ private static List<FileTransformerConfiguration> configurations() {
213220
.failureBehavior(LEAVE).build());
214221
}
215222

223+
@Test
224+
void explicitExecutor_shouldUseExecutor() throws Exception {
225+
Path testPath = testFs.getPath("test_file.txt");
226+
assertThat(testPath).doesNotExist();
227+
String newContent = RandomStringUtils.randomAlphanumeric(2000);
228+
229+
ExecutorService executor = Executors.newSingleThreadExecutor();
230+
try {
231+
SpyingExecutorService spyingExecutorService = new SpyingExecutorService(executor);
232+
FileTransformerConfiguration configuration = FileTransformerConfiguration
233+
.builder()
234+
.fileWriteOption(FileWriteOption.CREATE_NEW)
235+
.failureBehavior(DELETE)
236+
.executorService(spyingExecutorService)
237+
.build();
238+
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, configuration);
239+
240+
stubSuccessfulStreaming(newContent, transformer);
241+
assertThat(testPath).hasContent(newContent);
242+
assertThat(spyingExecutorService.hasReceivedTasks()).isTrue();
243+
} finally {
244+
executor.shutdown();
245+
assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
246+
}
247+
}
248+
216249
private static void stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception {
217250
CompletableFuture<String> future = transformer.prepare();
218251
transformer.onResponse("foobar");
@@ -240,4 +273,90 @@ private static void stubException(String newContent, FileAsyncResponseTransforme
240273
private static SdkPublisher<ByteBuffer> testPublisher(String content) {
241274
return SdkPublisher.adapt(Flowable.just(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))));
242275
}
276+
277+
private static final class SpyingExecutorService implements ExecutorService {
278+
private final ExecutorService executorService;
279+
private boolean receivedTasks = false;
280+
281+
private SpyingExecutorService(ExecutorService executorService) {
282+
this.executorService = executorService;
283+
}
284+
285+
public boolean hasReceivedTasks() {
286+
return receivedTasks;
287+
}
288+
289+
@Override
290+
public void shutdown() {
291+
executorService.shutdown();
292+
}
293+
294+
@Override
295+
public List<Runnable> shutdownNow() {
296+
return executorService.shutdownNow();
297+
}
298+
299+
@Override
300+
public boolean isShutdown() {
301+
return executorService.isShutdown();
302+
}
303+
304+
@Override
305+
public boolean isTerminated() {
306+
return executorService.isTerminated();
307+
}
308+
309+
@Override
310+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
311+
return executorService.awaitTermination(timeout, unit);
312+
}
313+
314+
@Override
315+
public <T> Future<T> submit(Callable<T> task) {
316+
receivedTasks = true;
317+
return executorService.submit(task);
318+
}
319+
320+
@Override
321+
public <T> Future<T> submit(Runnable task, T result) {
322+
receivedTasks = true;
323+
return executorService.submit(task, result);
324+
}
325+
326+
@Override
327+
public Future<?> submit(Runnable task) {
328+
receivedTasks = true;
329+
return executorService.submit(task);
330+
}
331+
332+
@Override
333+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
334+
receivedTasks = true;
335+
return executorService.invokeAll(tasks);
336+
}
337+
338+
@Override
339+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
340+
receivedTasks = true;
341+
return executorService.invokeAll(tasks, timeout, unit);
342+
}
343+
344+
@Override
345+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
346+
receivedTasks = true;
347+
return executorService.invokeAny(tasks);
348+
}
349+
350+
@Override
351+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
352+
receivedTasks = true;
353+
return executorService.invokeAny(tasks, timeout, unit);
354+
}
355+
356+
@Override
357+
public void execute(Runnable command) {
358+
receivedTasks = true;
359+
executorService.execute(command);
360+
}
361+
}
243362
}

0 commit comments

Comments
 (0)