Skip to content

+tck add missing public modifiers, #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public IdentityProcessorVerification(final TestEnvironment env, long publisherSh

this.subscriberVerification = new SubscriberVerification<T>(env) {
@Override
Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
public Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
return IdentityProcessorVerification.this.createSubscriber(probe);
}

@Override
Publisher<T> createHelperPublisher(int elements) {
public Publisher<T> createHelperPublisher(int elements) {
return IdentityProcessorVerification.this.createHelperPublisher(elements);
}
};
Expand Down Expand Up @@ -156,30 +156,28 @@ public void mustStartProducingWithTheOldestStillAvailableElementForASubscriber()
// must call `onError` on all its subscribers if it encounters a non-recoverable error
@Test
public void mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Exception {
new TestSetup(env, testBufferSize) {
{
ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor.getPublisher(), sub1);
ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor.getPublisher(), sub2);

sub1.requestMore(1);
expectRequestMore();
final T x = sendNextTFromUpstream();
expectNextElement(sub1, x);
sub1.requestMore(1);
new TestSetup(env, testBufferSize) {{
ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor.getPublisher(), sub1);
ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor.getPublisher(), sub2);

// sub1 now has received and element and has 1 pending
// sub2 has not yet requested anything
sub1.requestMore(1);
expectRequestMore();
final T x = sendNextTFromUpstream();
expectNextElement(sub1, x);
sub1.requestMore(1);

Exception ex = new RuntimeException("Test exception");
sendError(ex);
sub1.expectError(ex);
sub2.expectError(ex);
// sub1 now has received and element and has 1 pending
// sub2 has not yet requested anything

env.verifyNoAsyncErrors();
}
};
Exception ex = new RuntimeException("Test exception");
sendError(ex);
sub1.expectError(ex);
sub2.expectError(ex);

env.verifyNoAsyncErrors();
}};
}

@Test
Expand Down Expand Up @@ -498,7 +496,7 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws

/////////////////////// TEST INFRASTRUCTURE //////////////////////

abstract class TestSetup extends ManualPublisher<T> {
public abstract class TestSetup extends ManualPublisher<T> {
private TestEnvironment.ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
private Set<T> seenTees = new HashSet<T>();

Expand Down Expand Up @@ -540,7 +538,7 @@ public T sendNextTFromUpstream() throws InterruptedException {
}
}

private class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
TestEnvironment.Promise<Throwable> error;

public ManualSubscriberWithErrorCollection(TestEnvironment env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() {

/////////////////////// TEST INFRASTRUCTURE //////////////////////

interface PublisherTestRun<T> {
public interface PublisherTestRun<T> {
public void run(Publisher<T> pub) throws Throwable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ protected SubscriberVerification(TestEnvironment env) {
* In order to be meaningfully testable your Subscriber must inform the given
* `SubscriberProbe` of the respective events having been received.
*/
abstract Subscriber<T> createSubscriber(SubscriberProbe<T> probe);
public abstract Subscriber<T> createSubscriber(SubscriberProbe<T> probe);

/**
* Helper method required for generating test elements.
* It must create a Publisher for a stream with exactly the given number of elements.
* If `elements` is zero the produced stream must be infinite.
*/
abstract Publisher<T> createHelperPublisher(int elements);
public abstract Publisher<T> createHelperPublisher(int elements);

////////////////////// TEST SETUP VERIFICATION ///////////////////////////

@Test
void exerciseHappyPath() throws InterruptedException {
public void exerciseHappyPath() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerRequestMore(1);

Expand Down Expand Up @@ -63,7 +63,7 @@ void exerciseHappyPath() throws InterruptedException {
// must asynchronously schedule a respective event to the subscriber
// must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
@Test
void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
public void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
// cannot be meaningfully tested, or can it?
}

Expand All @@ -72,14 +72,14 @@ void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
// must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
// must consider the Subscription cancelled after having received the event
@Test
void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() {
public void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() {
// cannot be meaningfully tested, or can it?
}

// A Subscriber
// must not accept an `onSubscribe` event if it already has an active Subscription
@Test
void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException {
public void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException {
new TestSetup(env) {{
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
sub().onSubscribe(
Expand All @@ -100,7 +100,7 @@ public void cancel() {
// A Subscriber
// must call Subscription::cancel during shutdown if it still has an active Subscription
@Test
void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription() throws InterruptedException {
public void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerShutdown();
expectCancelling();
Expand All @@ -112,14 +112,14 @@ void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription()
// A Subscriber
// must ensure that all calls on a Subscription take place from the same thread or provide for respective external synchronization
@Test
void mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync() {
public void mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync() {
// cannot be meaningfully tested, or can it?
}

// A Subscriber
// must be prepared to receive one or more `onNext` events after having called Subscription::cancel
@Test
void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel() throws InterruptedException {
public void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerRequestMore(1);
puppet().triggerCancel();
Expand All @@ -133,7 +133,7 @@ void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCa
// A Subscriber
// must be prepared to receive an `onComplete` event with a preceding Subscription::requestMore call
@Test
void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
public void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerRequestMore(1);
sendCompletion();
Expand All @@ -146,7 +146,7 @@ void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMo
// A Subscriber
// must be prepared to receive an `onComplete` event without a preceding Subscription::requestMore call
@Test
void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
public void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
new TestSetup(env) {{
sendCompletion();
probe.expectCompletion();
Expand All @@ -158,7 +158,7 @@ void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionReques
// A Subscriber
// must be prepared to receive an `onError` event with a preceding Subscription::requestMore call
@Test
void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
public void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerRequestMore(1);
Exception ex = new RuntimeException("Test exception");
Expand All @@ -172,7 +172,7 @@ void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore(
// A Subscriber
// must be prepared to receive an `onError` event without a preceding Subscription::requestMore call
@Test
void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
public void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
new TestSetup(env) {{
Exception ex = new RuntimeException("Test exception");
sendError(ex);
Expand All @@ -184,15 +184,15 @@ void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMo
// A Subscriber
// must make sure that all calls on its `onXXX` methods happen-before the processing of the respective events
@Test
void mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() {
public void mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() {
// cannot be meaningfully tested, or can it?
}

/////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////

/////////////////////// TEST INFRASTRUCTURE //////////////////////

class TestSetup extends ManualPublisher<T> {
public class TestSetup extends ManualPublisher<T> {
ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
Probe probe;
T lastT = null;
Expand All @@ -205,24 +205,24 @@ public TestSetup(TestEnvironment env) throws InterruptedException {
probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub()));
}

Subscriber<T> sub() {
public Subscriber<T> sub() {
return subscriber.get();
}

SubscriberPuppet puppet() {
public SubscriberPuppet puppet() {
return probe.puppet.value();
}

void sendNextTFromUpstream() throws InterruptedException {
public void sendNextTFromUpstream() throws InterruptedException {
sendNext(nextT());
}

T nextT() throws InterruptedException {
public T nextT() throws InterruptedException {
lastT = tees.requestNextElement();
return lastT;
}

class Probe implements SubscriberProbe<T> {
public class Probe implements SubscriberProbe<T> {
Promise<SubscriberPuppet> puppet = new Promise<SubscriberPuppet>(env);
Receptacle<T> elements = new Receptacle<T>(env);
Latch completed = new Latch(env);
Expand All @@ -248,30 +248,30 @@ public void registerOnError(Throwable cause) {
error.complete(cause);
}

void expectNext(T expected) throws InterruptedException {
public void expectNext(T expected) throws InterruptedException {
expectNext(expected, env.defaultTimeoutMillis());
}

void expectNext(T expected, long timeoutMillis) throws InterruptedException {
public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected));
if (!received.equals(expected)) {
env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected));
}
}

void expectCompletion() throws InterruptedException {
public void expectCompletion() throws InterruptedException {
expectCompletion(env.defaultTimeoutMillis());
}

void expectCompletion(long timeoutMillis) throws InterruptedException {
public void expectCompletion(long timeoutMillis) throws InterruptedException {
completed.expectClose(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub()));
}

void expectError(Throwable expected) throws InterruptedException {
public void expectError(Throwable expected) throws InterruptedException {
expectError(expected, env.defaultTimeoutMillis());
}

void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
if (error.value() != expected) {
env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
Expand All @@ -284,7 +284,7 @@ public void verifyNoAsyncErrors() {
}
}

interface SubscriberProbe<T> {
public interface SubscriberProbe<T> {
/**
* Must be called by the test subscriber when it has received the `onSubscribe` event.
*/
Expand All @@ -306,7 +306,7 @@ interface SubscriberProbe<T> {
void registerOnError(Throwable cause);
}

interface SubscriberPuppet {
public interface SubscriberPuppet {
void triggerShutdown();

void triggerRequestMore(int elements);
Expand Down
Loading