|
16 | 16 | package software.amazon.awssdk.core.async;
|
17 | 17 |
|
18 | 18 | import java.nio.ByteBuffer;
|
| 19 | + |
19 | 20 | import org.reactivestreams.Subscriber;
|
20 | 21 | import org.reactivestreams.Subscription;
|
21 | 22 | import software.amazon.awssdk.annotations.SdkInternalApi;
|
|
31 | 32 | @SdkInternalApi
|
32 | 33 | final class ByteArrayAsyncRequestBody implements AsyncRequestBody {
|
33 | 34 |
|
34 |
| - private final byte[] bytes; |
| 35 | + private final byte[] bytes; |
35 | 36 |
|
36 | 37 | ByteArrayAsyncRequestBody(byte[] bytes) {
|
37 | 38 | this.bytes = bytes.clone();
|
38 | 39 | }
|
39 | 40 |
|
40 |
| - @Override |
41 |
| - public long contentLength() { |
42 |
| - return bytes.length; |
43 |
| - } |
44 |
| - |
45 |
| - @Override |
46 |
| - public void subscribe(Subscriber<? super ByteBuffer> subscriber) { |
47 |
| - // FIXME missing protection abiding to rule 1.9, proposal: |
48 |
| - // As per rule 1.09, we need to throw a `java.lang.NullPointerException` |
49 |
| - // if the `Subscriber` is `null` |
50 |
| - // if (subscriber == null) throw null; |
| 41 | + @Override |
| 42 | + public long contentLength() { |
| 43 | + return bytes.length; |
| 44 | + } |
51 | 45 |
|
52 |
| - // FIXME: onSubscribe is user code, and could be ill behaved, as library we should protect from this, |
53 |
| - // FIXME: This is covered by spec rule 2.13; proposal: |
54 |
| - // As per 2.13, this method must return normally (i.e. not throw). |
55 |
| - // try { |
56 |
| - subscriber.onSubscribe( |
57 |
| - new Subscription() { |
58 |
| - @Override |
59 |
| - public void request(long n) { |
60 |
| - if (n > 0) { |
61 |
| - subscriber.onNext(ByteBuffer.wrap(bytes)); |
62 |
| - subscriber.onComplete(); |
63 |
| - } |
64 |
| - } |
65 |
| - // FIXME missing required validation code (rule 1.9): |
66 |
| - // "Non-positive requests should be honored with IllegalArgumentException" |
67 |
| - // proposal: |
68 |
| - // else { |
69 |
| - // subscriber.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); |
70 |
| - // } |
| 46 | + @Override |
| 47 | + public void subscribe(Subscriber<? super ByteBuffer> s) { |
| 48 | + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null` |
| 49 | + if (s == null) throw new NullPointerException("Subscription MUST NOT be null."); |
71 | 50 |
|
72 |
| - @Override |
73 |
| - public void cancel() { |
| 51 | + // As per 2.13, this method must return normally (i.e. not throw). |
| 52 | + try { |
| 53 | + s.onSubscribe( |
| 54 | + new Subscription() { |
| 55 | + boolean done = false; |
| 56 | + @Override |
| 57 | + public void request(long n) { |
| 58 | + if (n > 0) { |
| 59 | + if (!done) { |
| 60 | + s.onNext(ByteBuffer.wrap(bytes)); |
| 61 | + done = true; |
| 62 | + s.onComplete(); |
74 | 63 | }
|
| 64 | + } else { |
| 65 | + s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); |
| 66 | + } |
| 67 | + } |
| 68 | + |
| 69 | + @Override |
| 70 | + public void cancel() { |
75 | 71 | }
|
76 |
| - ); |
77 |
| - // end of implementing 2.13 spec requirement |
78 |
| - // } catch (Throwable ex) { |
79 |
| - // new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " + |
80 |
| - // "by throwing an exception from onSubscribe.", ex) |
81 |
| - // // When onSubscribe fails this way, we don't know what state the |
82 |
| - // // subscriber is thus calling onError may cause more crashes. |
83 |
| - // .printStackTrace(); |
84 |
| - // } |
| 72 | + } |
| 73 | + ); |
| 74 | + } catch (Throwable ex) { |
| 75 | + new IllegalStateException(s + " violated the Reactive Streams rule 2.13 " + |
| 76 | + "by throwing an exception from onSubscribe.", ex) |
| 77 | + // When onSubscribe fails this way, we don't know what state the |
| 78 | + // s is thus calling onError may cause more crashes. |
| 79 | + .printStackTrace(); |
85 | 80 | }
|
| 81 | + } |
86 | 82 | }
|
0 commit comments