Skip to content

Commit 85a8f89

Browse files
authored
FileAsyncResponseTransformer - write to position (#5241)
add WRITE_TO_POSITION option for FileAsyncResponseTransformer
1 parent 1660397 commit 85a8f89

File tree

4 files changed

+176
-34
lines changed

4 files changed

+176
-34
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/FileTransformerConfiguration.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.ExecutorService;
2424
import software.amazon.awssdk.annotations.SdkPublicApi;
2525
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
26+
import software.amazon.awssdk.utils.ToString;
2627
import software.amazon.awssdk.utils.Validate;
2728
import software.amazon.awssdk.utils.builder.CopyableBuilder;
2829
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
@@ -41,11 +42,19 @@ public final class FileTransformerConfiguration implements ToCopyableBuilder<Fil
4142
private final FileWriteOption fileWriteOption;
4243
private final FailureBehavior failureBehavior;
4344
private final ExecutorService executorService;
45+
private final Long position;
4446

4547
private FileTransformerConfiguration(DefaultBuilder builder) {
4648
this.fileWriteOption = Validate.paramNotNull(builder.fileWriteOption, "fileWriteOption");
4749
this.failureBehavior = Validate.paramNotNull(builder.failureBehavior, "failureBehavior");
4850
this.executorService = builder.executorService;
51+
this.position = builder.position;
52+
if (fileWriteOption != FileWriteOption.WRITE_TO_POSITION && position != null) {
53+
throw new IllegalArgumentException(String.format(
54+
"'position' can only be used with 'WRITE_TO_POSITION' file write option, but was used with '%s'",
55+
fileWriteOption
56+
));
57+
}
4958
}
5059

5160
/**
@@ -72,6 +81,18 @@ public Optional<ExecutorService> executorService() {
7281
return Optional.ofNullable(executorService);
7382
}
7483

84+
/**
85+
* Exclusively used with {@link FileWriteOption#WRITE_TO_POSITION}. Configures the position, where to start writing to the
86+
* existing file. The location correspond to the first byte where new data will be written. For example, if {@code 128} is
87+
* configured, bytes 0-127 of the existing file will remain untouched and data will be written starting at byte 128. If not
88+
* specified, defaults to 0.
89+
*
90+
* @return The offset at which to start overwriting data in the file.
91+
*/
92+
public Long position() {
93+
return position;
94+
}
95+
7596
/**
7697
* Create a {@link Builder}, used to create a {@link FileTransformerConfiguration}.
7798
*/
@@ -137,6 +158,9 @@ public boolean equals(Object o) {
137158
if (failureBehavior != that.failureBehavior) {
138159
return false;
139160
}
161+
if (!Objects.equals(position, that.position)) {
162+
return false;
163+
}
140164
return Objects.equals(executorService, that.executorService);
141165
}
142166

@@ -145,6 +169,7 @@ public int hashCode() {
145169
int result = fileWriteOption != null ? fileWriteOption.hashCode() : 0;
146170
result = 31 * result + (failureBehavior != null ? failureBehavior.hashCode() : 0);
147171
result = 31 * result + (executorService != null ? executorService.hashCode() : 0);
172+
result = 31 * result + (position != null ? position.hashCode() : 0);
148173
return result;
149174
}
150175

@@ -165,7 +190,15 @@ public enum FileWriteOption {
165190
/**
166191
* Create a new file if it doesn't exist, otherwise append to the existing file.
167192
*/
168-
CREATE_OR_APPEND_TO_EXISTING
193+
CREATE_OR_APPEND_TO_EXISTING,
194+
195+
/**
196+
* Write to an existing file at the specified position, defined by the {@link FileTransformerConfiguration#position()}. If
197+
* the file does not exist, a {@link java.nio.file.NoSuchFileException} will be thrown. If
198+
* {@link FileTransformerConfiguration#position()} is not configured, start overwriting data at the beginning of the file
199+
* (byte 0).
200+
*/
201+
WRITE_TO_POSITION
169202
}
170203

171204
/**
@@ -209,12 +242,24 @@ public interface Builder extends CopyableBuilder<Builder, FileTransformerConfigu
209242
* @return This object for method chaining.
210243
*/
211244
Builder executorService(ExecutorService executorService);
245+
246+
/**
247+
* Exclusively used with {@link FileWriteOption#WRITE_TO_POSITION}. Configures the position, where to start writing to the
248+
* existing file. The location correspond to the first byte where new data will be written. For example, if {@code 128} is
249+
* configured, bytes 0-127 of the existing file will remain untouched and data will be written starting at byte 128. If
250+
* not specified, defaults to 0.
251+
*
252+
* @param writePosition the position at where to start writing data to the file.
253+
* @return This object for method chaining.
254+
*/
255+
Builder position(Long writePosition);
212256
}
213257

214258
private static final class DefaultBuilder implements Builder {
215259
private FileWriteOption fileWriteOption;
216260
private FailureBehavior failureBehavior;
217261
private ExecutorService executorService;
262+
private Long position;
218263

219264
private DefaultBuilder() {
220265
}
@@ -223,6 +268,7 @@ private DefaultBuilder(FileTransformerConfiguration fileTransformerConfiguration
223268
this.fileWriteOption = fileTransformerConfiguration.fileWriteOption;
224269
this.failureBehavior = fileTransformerConfiguration.failureBehavior;
225270
this.executorService = fileTransformerConfiguration.executorService;
271+
this.position = fileTransformerConfiguration.position;
226272
}
227273

228274
@Override
@@ -243,10 +289,25 @@ public Builder executorService(ExecutorService executorService) {
243289
return this;
244290
}
245291

292+
@Override
293+
public Builder position(Long position) {
294+
this.position = position;
295+
return this;
296+
}
297+
246298
@Override
247299
public FileTransformerConfiguration build() {
248300
return new FileTransformerConfiguration(this);
249301
}
250302
}
251303

252-
}
304+
@Override
305+
public String toString() {
306+
return ToString.builder("FileTransformerConfiguration")
307+
.add("fileWriteOption", this.fileWriteOption)
308+
.add("failureBehavior", this.failureBehavior)
309+
.add("executorService", this.executorService)
310+
.add("position", this.position)
311+
.build();
312+
}
313+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.core.internal.async;
1717

1818
import static software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption.CREATE_OR_APPEND_TO_EXISTING;
19+
import static software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption.WRITE_TO_POSITION;
1920
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
2021

2122
import java.io.IOException;
@@ -42,6 +43,7 @@
4243
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
4344
import software.amazon.awssdk.core.async.SdkPublisher;
4445
import software.amazon.awssdk.core.exception.SdkClientException;
46+
import software.amazon.awssdk.utils.Validate;
4547

4648
/**
4749
* {@link AsyncResponseTransformer} that writes the data to the specified file.
@@ -58,19 +60,21 @@ public final class FileAsyncResponseTransformer<ResponseT> implements AsyncRespo
5860
private final FileTransformerConfiguration configuration;
5961

6062
public FileAsyncResponseTransformer(Path path) {
61-
this.path = path;
62-
this.configuration = FileTransformerConfiguration.defaultCreateNew();
63-
this.position = 0L;
63+
this(path, FileTransformerConfiguration.defaultCreateNew(), 0L);
6464
}
6565

6666
public FileAsyncResponseTransformer(Path path, FileTransformerConfiguration fileConfiguration) {
67+
this(path, fileConfiguration, determineFilePositionToWrite(path, fileConfiguration));
68+
}
69+
70+
private FileAsyncResponseTransformer(Path path, FileTransformerConfiguration fileTransformerConfiguration, long position) {
6771
this.path = path;
68-
this.configuration = fileConfiguration;
69-
this.position = determineFilePositionToWrite(path);
72+
this.configuration = fileTransformerConfiguration;
73+
this.position = position;
7074
}
7175

72-
private long determineFilePositionToWrite(Path path) {
73-
if (configuration.fileWriteOption() == CREATE_OR_APPEND_TO_EXISTING) {
76+
private static long determineFilePositionToWrite(Path path, FileTransformerConfiguration fileConfiguration) {
77+
if (fileConfiguration.fileWriteOption() == CREATE_OR_APPEND_TO_EXISTING) {
7478
try {
7579
return Files.size(path);
7680
} catch (NoSuchFileException e) {
@@ -79,6 +83,9 @@ private long determineFilePositionToWrite(Path path) {
7983
throw SdkClientException.create("Cannot determine the current file size " + path, exception);
8084
}
8185
}
86+
if (fileConfiguration.fileWriteOption() == WRITE_TO_POSITION) {
87+
return Validate.getOrDefault(fileConfiguration.position(), () -> 0L);
88+
}
8289
return 0L;
8390
}
8491

@@ -95,6 +102,9 @@ private AsynchronousFileChannel createChannel(Path path) throws IOException {
95102
case CREATE_NEW:
96103
Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
97104
break;
105+
case WRITE_TO_POSITION:
106+
Collections.addAll(options, StandardOpenOption.WRITE);
107+
break;
98108
default:
99109
throw new IllegalArgumentException("Unsupported file write option: " + configuration.fileWriteOption());
100110
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/FileTransformerConfigurationTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,33 @@
1616
package software.amazon.awssdk.core;
1717

1818
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
19+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
20+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
1921
import static software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior.DELETE;
2022
import static software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption.CREATE_NEW;
2123

2224
import nl.jqno.equalsverifier.EqualsVerifier;
2325
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.params.ParameterizedTest;
27+
import org.junit.jupiter.params.provider.EnumSource;
2428

2529
class FileTransformerConfigurationTest {
2630

31+
@ParameterizedTest
32+
@EnumSource(
33+
value = FileTransformerConfiguration.FileWriteOption.class,
34+
names = {"CREATE_NEW", "CREATE_OR_REPLACE_EXISTING", "CREATE_OR_APPEND_TO_EXISTING"})
35+
void position_whenUsedWithNotWriteToPosition_shouldThrowIllegalArgumentException(
36+
FileTransformerConfiguration.FileWriteOption fileWriteOption) {
37+
FileTransformerConfiguration.Builder builder = FileTransformerConfiguration.builder()
38+
.position(123L)
39+
.failureBehavior(DELETE)
40+
.fileWriteOption(fileWriteOption);
41+
assertThatThrownBy(builder::build)
42+
.isInstanceOf(IllegalArgumentException.class)
43+
.hasMessageContaining(fileWriteOption.name());
44+
}
45+
2746
@Test
2847
void equalsHashcode() {
2948
EqualsVerifier.forClass(FileTransformerConfiguration.class)

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerTest.java

Lines changed: 77 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import java.nio.file.FileAlreadyExistsException;
2929
import java.nio.file.FileSystem;
3030
import java.nio.file.Files;
31+
import java.nio.file.NoSuchFileException;
3132
import java.nio.file.Path;
3233
import java.util.ArrayList;
33-
import java.util.Arrays;
3434
import java.util.Collection;
3535
import java.util.List;
3636
import java.util.concurrent.Callable;
@@ -50,6 +50,7 @@
5050
import org.reactivestreams.Subscription;
5151
import software.amazon.awssdk.core.FileTransformerConfiguration;
5252
import software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption;
53+
import software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior;
5354
import software.amazon.awssdk.core.async.SdkPublisher;
5455

5556
/**
@@ -185,8 +186,11 @@ void createOrAppendExisting_fileExists_shouldAppend() throws Exception {
185186
@MethodSource("configurations")
186187
void exceptionOccurred_deleteFileBehavior(FileTransformerConfiguration configuration) throws Exception {
187188
Path testPath = testFs.getPath("test_file.txt");
188-
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath,
189-
configuration);
189+
if (configuration.fileWriteOption() == FileWriteOption.WRITE_TO_POSITION) {
190+
// file must exist for WRITE_TO_POSITION
191+
Files.write(testPath, "foobar".getBytes(StandardCharsets.UTF_8));
192+
}
193+
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, configuration);
190194
stubException(RandomStringUtils.random(200), transformer);
191195
if (configuration.failureBehavior() == LEAVE) {
192196
assertThat(testPath).exists();
@@ -196,28 +200,19 @@ void exceptionOccurred_deleteFileBehavior(FileTransformerConfiguration configura
196200
}
197201

198202
private static List<FileTransformerConfiguration> configurations() {
199-
return Arrays.asList(
200-
FileTransformerConfiguration.defaultCreateNew(),
201-
FileTransformerConfiguration.defaultCreateOrAppend(),
202-
FileTransformerConfiguration.defaultCreateOrReplaceExisting(),
203-
FileTransformerConfiguration.builder()
204-
.fileWriteOption(FileWriteOption.CREATE_NEW)
205-
.failureBehavior(LEAVE).build(),
206-
FileTransformerConfiguration.builder()
207-
.fileWriteOption(FileWriteOption.CREATE_NEW)
208-
.failureBehavior(DELETE).build(),
209-
FileTransformerConfiguration.builder()
210-
.fileWriteOption(FileWriteOption.CREATE_OR_APPEND_TO_EXISTING)
211-
.failureBehavior(DELETE).build(),
212-
FileTransformerConfiguration.builder()
213-
.fileWriteOption(FileWriteOption.CREATE_OR_APPEND_TO_EXISTING)
214-
.failureBehavior(LEAVE).build(),
215-
FileTransformerConfiguration.builder()
216-
.fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING)
217-
.failureBehavior(DELETE).build(),
218-
FileTransformerConfiguration.builder()
219-
.fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING)
220-
.failureBehavior(LEAVE).build());
203+
List<FileTransformerConfiguration> conf = new ArrayList<>();
204+
conf.add(FileTransformerConfiguration.defaultCreateNew());
205+
conf.add(FileTransformerConfiguration.defaultCreateOrAppend());
206+
conf.add(FileTransformerConfiguration.defaultCreateOrReplaceExisting());
207+
for (FailureBehavior failureBehavior : FailureBehavior.values()) {
208+
for (FileWriteOption fileWriteOption : FileWriteOption.values()) {
209+
conf.add(FileTransformerConfiguration.builder()
210+
.fileWriteOption(fileWriteOption)
211+
.failureBehavior(failureBehavior)
212+
.build());
213+
}
214+
}
215+
return conf;
221216
}
222217

223218
@Test
@@ -246,6 +241,63 @@ void explicitExecutor_shouldUseExecutor() throws Exception {
246241
}
247242
}
248243

244+
@Test
245+
void writeToPosition_fileExists_shouldAppendFromPosition() throws Exception {
246+
int totalSize = 100;
247+
long prefixSize = 80L;
248+
int newContentLength = 20;
249+
250+
Path testPath = testFs.getPath("test_file.txt");
251+
String contentBeforeRewrite = RandomStringUtils.randomAlphanumeric(totalSize);
252+
byte[] existingBytes = contentBeforeRewrite.getBytes(StandardCharsets.UTF_8);
253+
Files.write(testPath, existingBytes);
254+
String newContent = RandomStringUtils.randomAlphanumeric(newContentLength);
255+
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(
256+
testPath,
257+
FileTransformerConfiguration.builder()
258+
.position(prefixSize)
259+
.failureBehavior(DELETE)
260+
.fileWriteOption(FileWriteOption.WRITE_TO_POSITION)
261+
.build());
262+
263+
stubSuccessfulStreaming(newContent, transformer);
264+
265+
String expectedContent = contentBeforeRewrite.substring(0, 80) + newContent;
266+
assertThat(testPath).hasContent(expectedContent);
267+
}
268+
269+
@Test
270+
void writeToPosition_fileDoesNotExists_shouldThrowException() throws Exception {
271+
Path path = testFs.getPath("this/file/does/not/exists");
272+
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(path);
273+
transformer.prepare();
274+
transformer.onResponse("foobar");
275+
assertThatThrownBy(() -> transformer.onStream(testPublisher("foo-bar-content")))
276+
.hasRootCauseInstanceOf(NoSuchFileException.class);
277+
278+
}
279+
280+
@Test
281+
void writeToPosition_fileExists_positionNotDefined_shouldRewriteFromStart() throws Exception {
282+
int totalSize = 100;
283+
Path testPath = testFs.getPath("test_file.txt");
284+
String contentBeforeRewrite = RandomStringUtils.randomAlphanumeric(totalSize);
285+
byte[] existingBytes = contentBeforeRewrite.getBytes(StandardCharsets.UTF_8);
286+
Files.write(testPath, existingBytes);
287+
String newContent = RandomStringUtils.randomAlphanumeric(totalSize);
288+
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(
289+
testPath,
290+
FileTransformerConfiguration.builder()
291+
.failureBehavior(DELETE)
292+
.fileWriteOption(FileWriteOption.WRITE_TO_POSITION)
293+
.build());
294+
295+
stubSuccessfulStreaming(newContent, transformer);
296+
297+
assertThat(testPath).hasContent(newContent);
298+
299+
}
300+
249301
private static void stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception {
250302
CompletableFuture<String> future = transformer.prepare();
251303
transformer.onResponse("foobar");

0 commit comments

Comments
 (0)