-
Notifications
You must be signed in to change notification settings - Fork 916
Add TCK tests for reactive streams implementations #519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7598c3a
98c8851
455c116
dcfd7bc
bc78da8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"category": "AWS SDK for Java v2", | ||
"type": "feature", | ||
"description": "Incorporate the [Reactive Streams Technology Compatibility Kit](https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck) and ensure current implementations are compliant. [#519](https://github.com/aws/aws-sdk-java-v2/issues/519)" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,10 @@ | |
package software.amazon.awssdk.core.async; | ||
|
||
import java.nio.ByteBuffer; | ||
|
||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
|
||
/** | ||
|
@@ -43,20 +45,42 @@ public long contentLength() { | |
} | ||
|
||
@Override | ||
public void subscribe(Subscriber<? super ByteBuffer> subscriber) { | ||
subscriber.onSubscribe( | ||
new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
if (n > 0) { | ||
subscriber.onNext(ByteBuffer.wrap(bytes)); | ||
subscriber.onComplete(); | ||
public void subscribe(Subscriber<? super ByteBuffer> s) { | ||
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null | ||
if (s == null) { | ||
throw new NullPointerException("Subscription MUST NOT be null."); | ||
} | ||
|
||
// As per 2.13, this method must return normally (i.e. not throw). | ||
try { | ||
s.onSubscribe( | ||
new Subscription() { | ||
boolean done = false; | ||
|
||
@Override | ||
public void request(long n) { | ||
if (n > 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there opportunity to extract some of these checks into a decorator? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that might be a little hard to do in a general way. For example like with the async file publisher, we need to synchronize the interactions with the |
||
if (!done) { | ||
s.onNext(ByteBuffer.wrap(bytes)); | ||
done = true; | ||
s.onComplete(); | ||
} | ||
} else { | ||
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
@Override | ||
public void cancel() { | ||
} | ||
} | ||
}); | ||
); | ||
} catch (Throwable ex) { | ||
new IllegalStateException(s + " violated the Reactive Streams rule 2.13 " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want to log There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's okay, it's just a diagnostic. Correct implementations should return normally from |
||
"by throwing an exception from onSubscribe.", ex) | ||
// When onSubscribe fails this way, we don't know what state the | ||
// s is thus calling onError may cause more crashes. | ||
.printStackTrace(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems weird. So we just log the error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need to log it but since Since 2.13 requires the |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh how did we not have this before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it was being pulled in transitively via the SPI:
mvn dependency:tree
: