16
16
package software .amazon .awssdk .core .async ;
17
17
18
18
import java .io .IOException ;
19
- import java .io .UncheckedIOException ;
20
19
import java .nio .ByteBuffer ;
21
20
import java .nio .channels .AsynchronousFileChannel ;
22
21
import java .nio .channels .CompletionHandler ;
23
- import java .nio .file .Files ;
24
22
import java .nio .file .Path ;
25
23
import java .nio .file .StandardOpenOption ;
26
24
import java .util .concurrent .atomic .AtomicLong ;
27
25
import org .reactivestreams .Subscriber ;
28
26
import org .reactivestreams .Subscription ;
29
27
import software .amazon .awssdk .annotations .SdkInternalApi ;
28
+ import software .amazon .awssdk .core .util .async .NoopSubscription ;
30
29
import software .amazon .awssdk .utils .builder .SdkBuilder ;
31
30
32
31
/**
@@ -43,7 +42,7 @@ final class FileAsyncRequestBody implements AsyncRequestBody {
43
42
/**
44
43
* File to read.
45
44
*/
46
- private final Path file ;
45
+ private final Path path ;
47
46
48
47
/**
49
48
* Size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber.
@@ -52,18 +51,33 @@ final class FileAsyncRequestBody implements AsyncRequestBody {
52
51
53
52
54
53
private FileAsyncRequestBody (DefaultBuilder builder ) {
55
- this .file = builder .path ;
54
+ this .path = builder .path ;
56
55
this .chunkSizeInBytes = builder .chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder .chunkSizeInBytes ;
57
56
}
58
57
59
58
@ Override
60
59
public long contentLength () {
61
- return file .toFile ().length ();
60
+ return path .toFile ().length ();
62
61
}
63
62
64
63
@ Override
65
64
public void subscribe (Subscriber <? super ByteBuffer > s ) {
66
- s .onSubscribe (new FileSubscription (file , s , chunkSizeInBytes ));
65
+ try {
66
+ AsynchronousFileChannel channel = openInputChannel (this .path );
67
+
68
+ // We need to synchronize here because the subscriber could call
69
+ // request() from within onSubscribe which would potentially
70
+ // trigger onNext before onSubscribe is finished.
71
+ Subscription subscription = new FileSubscription (channel , s , chunkSizeInBytes );
72
+ synchronized (subscription ) {
73
+ s .onSubscribe (subscription );
74
+ }
75
+ } catch (IOException e ) {
76
+ // subscribe() must return normally, so we need to signal the
77
+ // failure to open via onError() once onSubscribe() is signaled.
78
+ s .onSubscribe (new NoopSubscription (s ));
79
+ s .onError (e );
80
+ }
67
81
}
68
82
69
83
/**
@@ -135,37 +149,43 @@ public FileAsyncRequestBody build() {
135
149
* Reads the file for one subscriber.
136
150
*/
137
151
private static class FileSubscription implements Subscription {
138
-
139
152
private final AsynchronousFileChannel inputChannel ;
140
153
private final Subscriber <? super ByteBuffer > subscriber ;
141
154
private final int chunkSize ;
142
155
143
156
private long position = 0 ;
144
157
private AtomicLong outstandingDemand = new AtomicLong (0 );
145
158
private boolean writeInProgress = false ;
159
+ private volatile boolean done = false ;
146
160
147
- private FileSubscription (Path file , Subscriber <? super ByteBuffer > subscriber , int chunkSize ) {
148
- this .inputChannel = openInputChannel ( file ) ;
161
+ private FileSubscription (AsynchronousFileChannel inputChannel , Subscriber <? super ByteBuffer > subscriber , int chunkSize ) {
162
+ this .inputChannel = inputChannel ;
149
163
this .subscriber = subscriber ;
150
164
this .chunkSize = chunkSize ;
151
165
}
152
166
153
167
@ Override
154
168
public void request (long n ) {
169
+ if (done ) {
170
+ return ;
171
+ }
172
+
155
173
if (n < 1 ) {
156
174
IllegalArgumentException ex =
157
- new IllegalArgumentException (subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements." );
158
- subscriber .onError (ex );
159
- } else
175
+ new IllegalArgumentException (subscriber + " violated the Reactive Streams rule 3.9 by requesting a "
176
+ + "non-positive number of elements." );
177
+ signalOnError (ex );
178
+ } else {
160
179
try {
161
- long initialDemand = outstandingDemand .get ();
162
- long newDemand = initialDemand + n ;
163
- if (newDemand < 1 ) {
164
- // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded"
165
- outstandingDemand .set (Long .MAX_VALUE );
166
- } else {
167
- outstandingDemand .set (newDemand );
168
- }
180
+ // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as
181
+ // "effectively unbounded"
182
+ outstandingDemand .getAndUpdate (initialDemand -> {
183
+ if (Long .MAX_VALUE - initialDemand < n ) {
184
+ return Long .MAX_VALUE ;
185
+ } else {
186
+ return initialDemand + n ;
187
+ }
188
+ });
169
189
170
190
synchronized (this ) {
171
191
if (!writeInProgress ) {
@@ -174,47 +194,54 @@ public void request(long n) {
174
194
}
175
195
}
176
196
} catch (Exception e ) {
177
- subscriber . onError (e );
197
+ signalOnError (e );
178
198
}
199
+ }
179
200
}
180
201
181
202
@ Override
182
203
public void cancel () {
183
- closeFile ();
204
+ synchronized (this ) {
205
+ if (!done ) {
206
+ done = true ;
207
+ closeFile ();
208
+ }
209
+ }
184
210
}
185
211
186
212
private void readData () {
187
213
// It's possible to have another request for data come in after we've closed the file.
188
214
if (!inputChannel .isOpen ()) {
189
215
return ;
190
216
}
217
+
191
218
final ByteBuffer buffer = ByteBuffer .allocate (chunkSize );
192
219
inputChannel .read (buffer , position , buffer , new CompletionHandler <Integer , ByteBuffer >() {
193
220
@ Override
194
221
public void completed (Integer result , ByteBuffer attachment ) {
195
222
if (result > 0 ) {
196
223
attachment .flip ();
197
224
position += attachment .remaining ();
198
- subscriber . onNext (attachment );
225
+ signalOnNext (attachment );
199
226
// If we have more permits, queue up another read.
200
227
if (outstandingDemand .decrementAndGet () > 0 ) {
201
228
readData ();
202
229
return ;
203
230
}
204
231
} else {
205
232
// Reached the end of the file, notify the subscriber and cleanup
206
- subscriber . onComplete ();
233
+ signalOnComplete ();
207
234
closeFile ();
208
235
}
209
236
210
- synchronized (FileSubscription . this ) {
237
+ synchronized (this ) {
211
238
writeInProgress = false ;
212
239
}
213
240
}
214
241
215
242
@ Override
216
243
public void failed (Throwable exc , ByteBuffer attachment ) {
217
- subscriber . onError (exc );
244
+ signalOnError (exc );
218
245
closeFile ();
219
246
}
220
247
});
@@ -224,20 +251,38 @@ private void closeFile() {
224
251
try {
225
252
inputChannel .close ();
226
253
} catch (IOException e ) {
227
- throw new UncheckedIOException (e );
254
+ signalOnError (e );
228
255
}
229
256
}
230
257
231
- }
258
+ private void signalOnNext (ByteBuffer bb ) {
259
+ synchronized (this ) {
260
+ if (!done ) {
261
+ subscriber .onNext (bb );
262
+ }
263
+ }
264
+ }
232
265
233
- private static AsynchronousFileChannel openInputChannel (Path path ) {
234
- try {
235
- if (!Files .exists (path )) Files .createFile (path );
236
- return AsynchronousFileChannel .open (path , StandardOpenOption .READ );
237
- } catch (IOException e ) {
238
- throw new UncheckedIOException (e );
266
+ private void signalOnComplete () {
267
+ synchronized (this ) {
268
+ if (!done ) {
269
+ subscriber .onComplete ();
270
+ done = true ;
271
+ }
272
+ }
239
273
}
240
- }
241
274
275
+ private void signalOnError (Throwable t ) {
276
+ synchronized (this ) {
277
+ if (!done ) {
278
+ subscriber .onError (t );
279
+ done = true ;
280
+ }
281
+ }
282
+ }
283
+ }
242
284
285
+ private static AsynchronousFileChannel openInputChannel (Path path ) throws IOException {
286
+ return AsynchronousFileChannel .open (path , StandardOpenOption .READ );
287
+ }
243
288
}
0 commit comments