Skip to content

Replace reactive programming library #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<gson-version>2.8.5</gson-version>
<commons-io-version>2.6</commons-io-version>
<commons-lang3-version>3.8.1</commons-lang3-version>
<jersey-version>2.25.1</jersey-version>
<rx-version>2.2.7</rx-version>
</properties>

<!-- we use the travis providers for deployment so don't need to specify repositories here -->
Expand Down Expand Up @@ -81,11 +81,6 @@
<version>${okhttp3-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.bundles.repackaged</groupId>
<artifactId>jersey-jsr166e</artifactId>
<version>${jersey-version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down Expand Up @@ -134,6 +129,11 @@
<artifactId>gson</artifactId>
<version>${gson-version}</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rx-version}</version>
</dependency>
</dependencies>

<build>
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/com/ibm/cloud/sdk/core/http/ServiceCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package com.ibm.cloud.sdk.core.http;

import jersey.repackaged.jsr166e.CompletableFuture;
import io.reactivex.Single;

/**
* Service Call.
Expand Down Expand Up @@ -62,17 +62,17 @@ public interface ServiceCall<T> {
void enqueueWithDetails(ServiceCallbackWithDetails<T> callback);

/**
* Reactive requests, in this case, you could take advantage both synchronous and asynchronous.
* Reactive request using the RxJava 2 library. See https://github.com/ReactiveX/RxJava.
*
* @return a CompletableFuture wrapper for your response
* @return a Single object containing the service call to be observed/subscribed to
*/
CompletableFuture<T> rx();
Single<T> reactiveRequest();

/**
* Reactive requests with added HTTP information. In this case, you could take advantage both synchronous and
* asynchronous.
* Reactive request using the RxJava 2 library. See https://github.com/ReactiveX/RxJava. In addition, the wrapped
* service call will contain added HTTP information.
*
* @return a CompletableFuture wrapper for your response
* @return a Single object containing the service call to be observed/subscribed to
*/
CompletableFuture<Response<T>> rxWithDetails();
Single<Response<T>> reactiveRequestWithDetails();
}
50 changes: 13 additions & 37 deletions src/main/java/com/ibm/cloud/sdk/core/service/BaseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import com.ibm.cloud.sdk.core.service.security.IamTokenManager;
import com.ibm.cloud.sdk.core.util.CredentialUtils;
import com.ibm.cloud.sdk.core.util.RequestUtils;
import jersey.repackaged.jsr166e.CompletableFuture;
import io.reactivex.Single;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Credentials;
Expand All @@ -47,6 +47,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.logging.Logger;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -539,51 +540,26 @@ public void onResponse(Call call, Response response) {
}

@Override
public CompletableFuture<T> rx() {
final CompletableFuture<T> completableFuture = new CompletableFuture<T>();

call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
completableFuture.completeExceptionally(e);
}

public Single<T> reactiveRequest() {
return Single.fromCallable(new Callable<T>() {
@Override
public void onResponse(Call call, Response response) {
try {
completableFuture.complete(processServiceCall(converter, response));
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
public T call() throws Exception {
Response response = call.execute();
return processServiceCall(converter, response);
}
});

return completableFuture;
}

@Override
public CompletableFuture<com.ibm.cloud.sdk.core.http.Response<T>> rxWithDetails() {
final CompletableFuture<com.ibm.cloud.sdk.core.http.Response<T>> completableFuture
= new CompletableFuture<>();

call.enqueue(new Callback() {
public Single<com.ibm.cloud.sdk.core.http.Response<T>> reactiveRequestWithDetails() {
return Single.fromCallable(new Callable<com.ibm.cloud.sdk.core.http.Response<T>>() {
@Override
public void onFailure(Call call, IOException e) {
completableFuture.completeExceptionally(e);
}

@Override
public void onResponse(Call call, Response response) {
try {
T responseModel = processServiceCall(converter, response);
completableFuture.complete(new com.ibm.cloud.sdk.core.http.Response<>(responseModel, response));
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
public com.ibm.cloud.sdk.core.http.Response<T> call() throws Exception {
Response httpResponse = call.execute();
T responseModel = processServiceCall(converter, httpResponse);
return new com.ibm.cloud.sdk.core.http.Response<>(responseModel, httpResponse);
}
});

return completableFuture;
}

@Override
Expand Down
109 changes: 63 additions & 46 deletions src/test/java/com/ibm/cloud/sdk/core/test/service/ResponseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,54 @@
import com.ibm.cloud.sdk.core.service.model.GenericModel;
import com.ibm.cloud.sdk.core.test.BaseServiceUnitTest;
import com.ibm.cloud.sdk.core.util.ResponseConverterUtils;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.mockwebserver.MockResponse;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

public class ResponseTest extends BaseServiceUnitTest {
private class TestModel extends GenericModel {
String city;

private class TestModel extends GenericModel { }
String getKey() {
return city;
}
}

public class TestService extends BaseService {

private static final String SERVICE_NAME = "test";

public TestService() {
TestService() {
super(SERVICE_NAME);
}

public ServiceCall<TestModel> testMethod() {
ServiceCall<TestModel> testMethod() {
RequestBuilder builder = RequestBuilder.get(HttpUrl.parse(getEndPoint() + "/v1/test"));
return createServiceCall(builder.build(), ResponseConverterUtils.getObject(TestModel.class));
}

public ServiceCall<Void> testHeadMethod() {
ServiceCall<Void> testHeadMethod() {
RequestBuilder builder = RequestBuilder.head(HttpUrl.parse(getEndPoint() + "/v1/test"));
return createServiceCall(builder.build(), ResponseConverterUtils.getVoid());
}
}

private TestService service;
private String testResponseKey = "city";
private String testResponseValue = "Columbus";
private String testResponseBody = "{\"" + testResponseKey + "\": \"" + testResponseValue + "\"}";

// used for a specific test so we don't run into any weirdness with final, one-element, generic arrays
private Response<TestModel> testResponseModel = null;

/*
* (non-Javadoc)
Expand All @@ -78,10 +90,11 @@ public void setUp() throws Exception {
*/
@Test
public void testExecuteWithDetails() throws InterruptedException {
server.enqueue(new MockResponse().setBody("{\"test_key\": \"test_value\"}"));
server.enqueue(new MockResponse().setBody(testResponseBody));

Response<TestModel> response = service.testMethod().executeWithDetails();
assertNotNull(response.getResult());
assertEquals(testResponseValue, response.getResult().getKey());
assertNotNull(response.getHeaders());
}

Expand All @@ -92,63 +105,67 @@ public void testExecuteWithDetails() throws InterruptedException {
*/
@Test
public void testEnqueueWithDetails() throws InterruptedException {
server.enqueue(new MockResponse().setBody("{\"test_key\": \"test_value\"}"));
server.enqueue(new MockResponse().setBody(testResponseBody));

service.testMethod().enqueueWithDetails(new ServiceCallbackWithDetails<TestModel>() {
@Override
public void onResponse(Response<TestModel> response) {
assertNotNull(response.getResult());
assertEquals(testResponseValue, response.getResult().getKey());
assertNotNull(response.getHeaders());
}

@Override
public void onFailure(Exception e) { }
});
}

/**
* Test that all fields are populated when calling rxWithDetails() using a callback.
*
* @throws InterruptedException the interrupted exception
*/
@Test
public void testRxWithDetailsCallback() throws InterruptedException {
server.enqueue(new MockResponse().setBody("{\"test_key\": \"test_value\"}"));

service.testMethod().rxWithDetails().thenAccept(response -> {
assertNotNull(response.getResult());
assertNotNull(response.getHeaders());
});
Thread.sleep(2000);
}

/**
* Test that all fields are populated when calling rxWithDetails() using an asynchronous callback.
*
* @throws InterruptedException the interrupted exception
*/
@Test
public void testRxWithDetailsAsync() throws InterruptedException {
server.enqueue(new MockResponse().setBody("{\"test_key\": \"test_value\"}"));

service.testMethod().rxWithDetails().thenAcceptAsync(response -> {
assertNotNull(response.getResult());
assertNotNull(response.getHeaders());
});
public void testReactiveRequest() throws InterruptedException {
server.enqueue(new MockResponse().setBody(testResponseBody));

final TestModel[] responseValue = new TestModel[1];
Single<TestModel> observableRequest = service.testMethod().reactiveRequest();

observableRequest
.subscribeOn(Schedulers.single())
.subscribe(new Consumer<TestModel>() {
@Override
public void accept(TestModel testModel) throws Exception {
responseValue[0] = testModel;
}
});

// asynchronous, so test that we continued without a value yet
assertNull(responseValue[0]);
Thread.sleep(2000);
assertNotNull(responseValue[0]);
assertEquals(testResponseValue, responseValue[0].getKey());
}

/**
* Test that all fields are populated when calling rxWjthDetails() synchronously.
*
* @throws InterruptedException the interrupted exception
* @throws ExecutionException the execution exception
*/
@Test
public void testRxWithDetailsSync() throws InterruptedException, ExecutionException {
server.enqueue(new MockResponse().setBody("{\"test_key\": \"test_value\"}"));

Response<TestModel> response = service.testMethod().rxWithDetails().get();
assertNotNull(response.getResult());
assertNotNull(response.getHeaders());
public void testReactiveRequestWithDetails() throws InterruptedException {
server.enqueue(new MockResponse().setBody(testResponseBody));

Single<Response<TestModel>> observableRequest = service.testMethod().reactiveRequestWithDetails();

observableRequest
.subscribeOn(Schedulers.single())
.subscribe(new Consumer<Response<TestModel>>() {
@Override
public void accept(Response<TestModel> testModel) throws Exception {
testResponseModel = testModel;
}
});

// asynchronous, so test that we continued without a value yet
assertNull(testResponseModel);
Thread.sleep(2000);
assertNotNull(testResponseModel);
assertEquals(testResponseValue, testResponseModel.getResult().getKey());
assertNotNull(testResponseModel.getHeaders());
}

/**
Expand Down