|
31 | 31 | import software.amazon.awssdk.core.async.AsyncRequestBody;
|
32 | 32 | import software.amazon.awssdk.core.internal.util.Mimetype;
|
33 | 33 | import software.amazon.awssdk.core.internal.util.NoopSubscription;
|
| 34 | +import software.amazon.awssdk.utils.Logger; |
34 | 35 | import software.amazon.awssdk.utils.builder.SdkBuilder;
|
35 | 36 |
|
36 | 37 | /**
|
|
41 | 42 | */
|
42 | 43 | @SdkInternalApi
|
43 | 44 | public final class FileAsyncRequestBody implements AsyncRequestBody {
|
| 45 | + private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class); |
44 | 46 |
|
45 | 47 | /**
|
46 | 48 | * Default size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber.
|
@@ -241,7 +243,7 @@ public void completed(Integer result, ByteBuffer attachment) {
|
241 | 243 | if (result > 0) {
|
242 | 244 | attachment.flip();
|
243 | 245 | position.addAndGet(attachment.remaining());
|
244 |
| - subscriber.onNext(attachment); |
| 246 | + signalOnNext(attachment); |
245 | 247 |
|
246 | 248 | synchronized (lock) {
|
247 | 249 | // If we have more permits, queue up another read.
|
@@ -270,21 +272,31 @@ private void closeFile() {
|
270 | 272 | try {
|
271 | 273 | inputChannel.close();
|
272 | 274 | } catch (IOException e) {
|
273 |
| - signalOnError(e); |
| 275 | + log.warn(() -> "Failed to close the file", e); |
274 | 276 | }
|
275 | 277 | }
|
276 | 278 |
|
277 |
| - private void signalOnComplete() { |
| 279 | + private void signalOnNext(ByteBuffer attachment) { |
278 | 280 | if (!done) {
|
279 |
| - done = true; |
280 |
| - subscriber.onComplete(); |
| 281 | + subscriber.onNext(attachment); |
| 282 | + } |
| 283 | + } |
| 284 | + |
| 285 | + private void signalOnComplete() { |
| 286 | + synchronized (this) { |
| 287 | + if (!done) { |
| 288 | + done = true; |
| 289 | + subscriber.onComplete(); |
| 290 | + } |
281 | 291 | }
|
282 | 292 | }
|
283 | 293 |
|
284 | 294 | private void signalOnError(Throwable t) {
|
285 |
| - if (!done) { |
286 |
| - done = true; |
287 |
| - subscriber.onError(t); |
| 295 | + synchronized (this) { |
| 296 | + if (!done) { |
| 297 | + done = true; |
| 298 | + subscriber.onError(t); |
| 299 | + } |
288 | 300 | }
|
289 | 301 | }
|
290 | 302 | }
|
|
0 commit comments