Skip to content

Commit 42f967e

Browse files
Merge pull request ReactiveX#397 from benjchristensen/apache-async-http
Observable API for Apache HttpAsyncClient 4.0
2 parents 49d1cad + a4327e0 commit 42f967e

File tree

10 files changed

+776
-1
lines changed

10 files changed

+776
-1
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
apply plugin: 'osgi'
2+
3+
sourceCompatibility = JavaVersion.VERSION_1_6
4+
targetCompatibility = JavaVersion.VERSION_1_6
5+
6+
dependencies {
7+
compile project(':rxjava-core')
8+
compile 'org.apache.httpcomponents:httpclient:4.3'
9+
compile 'org.apache.httpcomponents:httpcore-nio:4.3'
10+
compile 'org.apache.httpcomponents:httpasyncclient:4.0-beta4'
11+
}
12+
13+
jar {
14+
manifest {
15+
name = 'rxjava-apache-http'
16+
instruction 'Bundle-Vendor', 'Netflix'
17+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
18+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
19+
}
20+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.apache.http;
17+
18+
import org.apache.http.HttpResponse;
19+
import org.apache.http.client.HttpClient;
20+
import org.apache.http.concurrent.FutureCallback;
21+
import org.apache.http.nio.client.HttpAsyncClient;
22+
import org.apache.http.nio.client.methods.HttpAsyncMethods;
23+
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
24+
25+
import rx.Observable;
26+
import rx.Observable.OnSubscribeFunc;
27+
import rx.Observer;
28+
import rx.Subscription;
29+
import rx.apache.http.consumers.ResponseConsumerDelegate;
30+
import rx.subscriptions.CompositeSubscription;
31+
import rx.subscriptions.Subscriptions;
32+
33+
/**
34+
* An {@link Observable} interface to Apache {@link HttpAsyncClient}.
35+
* <p>
36+
* The initial {@link HttpResponse} is returned via {@link Observer#onNext} wrapped in a {@link ObservableHttpResponse}.
37+
* <p>
38+
* The content stream is retrieved from {@link ObservableHttpResponse#getContent()}.
39+
* <p>
40+
* It is aware of Content-Type <i>text/event-stream</i> and will stream each event via {@link Observer#onNext}.
41+
* <p>
42+
* Other Content-Types will be returned as a single call to {@link Observer#onNext}.
43+
* <p>
44+
* Examples:
45+
* <p>
46+
* <pre> {@code
47+
* ObservableHttp.createGet("http://www.wikipedia.com", httpClient).toObservable();
48+
* } </pre>
49+
* <p>
50+
* <pre> {@code
51+
* ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://www.wikipedia.com"), httpClient).toObservable();
52+
* } </pre>
53+
*
54+
* An {@link HttpClient} can be created like this:
55+
*
56+
* <pre> {@code
57+
* CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
58+
* httpClient.start(); // start it
59+
* httpClient.stop(); // stop it
60+
* } </pre>
61+
* <p>
62+
* A client with custom configurations can be created like this:
63+
* </p>
64+
* <pre> {@code
65+
* final RequestConfig requestConfig = RequestConfig.custom()
66+
* .setSocketTimeout(1000)
67+
* .setConnectTimeout(200).build();
68+
* final CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom()
69+
* .setDefaultRequestConfig(requestConfig)
70+
* .setMaxConnPerRoute(20)
71+
* .setMaxConnTotal(50)
72+
* .build();
73+
* httpClient.start();
74+
* }</pre>
75+
* <p>
76+
*
77+
* @param <T>
78+
*/
79+
public class ObservableHttp<T> {
80+
81+
private final OnSubscribeFunc<T> onSubscribe;
82+
83+
private ObservableHttp(OnSubscribeFunc<T> onSubscribe) {
84+
this.onSubscribe = onSubscribe;
85+
}
86+
87+
private static <T> ObservableHttp<T> create(OnSubscribeFunc<T> onSubscribe) {
88+
return new ObservableHttp<T>(onSubscribe);
89+
}
90+
91+
public Observable<T> toObservable() {
92+
return Observable.create(new OnSubscribeFunc<T>() {
93+
94+
@Override
95+
public Subscription onSubscribe(Observer<? super T> observer) {
96+
return onSubscribe.onSubscribe(observer);
97+
}
98+
});
99+
}
100+
101+
public static ObservableHttp<ObservableHttpResponse> createGet(String uri, final HttpAsyncClient client) {
102+
return createRequest(HttpAsyncMethods.createGet(uri), client);
103+
}
104+
105+
/**
106+
* Execute request using {@link HttpAsyncRequestProducer} to define HTTP Method, URI and payload (if applicable).
107+
* <p>
108+
* If the response is chunked (or flushed progressively such as with <i>text/event-stream</i> <a href="http://www.w3.org/TR/2009/WD-eventsource-20091029/">Server-Sent Events</a>) this will call
109+
* {@link Observer#onNext} multiple times.
110+
* <p>
111+
* Use {@code HttpAsyncMethods.create* } factory methods to create {@link HttpAsyncRequestProducer} instances.
112+
* <p>
113+
* A client can be retrieved like this:
114+
* <p>
115+
* <pre> {@code CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); } </pre> </p>
116+
* <p>
117+
* A client with custom configurations can be created like this:
118+
* </p>
119+
* <pre> {@code
120+
* final RequestConfig requestConfig = RequestConfig.custom()
121+
* .setSocketTimeout(3000)
122+
* .setConnectTimeout(3000).build();
123+
* final CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
124+
* .setDefaultRequestConfig(requestConfig)
125+
* .setMaxConnPerRoute(20)
126+
* .setMaxConnTotal(50)
127+
* .build();
128+
* httpclient.start();
129+
* }</pre>
130+
*
131+
*
132+
* @param requestProducer
133+
* @param client
134+
* @return
135+
*/
136+
public static ObservableHttp<ObservableHttpResponse> createRequest(final HttpAsyncRequestProducer requestProducer, final HttpAsyncClient client) {
137+
138+
return ObservableHttp.create(new OnSubscribeFunc<ObservableHttpResponse>() {
139+
140+
@Override
141+
public Subscription onSubscribe(final Observer<? super ObservableHttpResponse> observer) {
142+
143+
final CompositeSubscription parentSubscription = new CompositeSubscription();
144+
145+
// return a Subscription that wraps the Future so it can be cancelled
146+
parentSubscription.add(Subscriptions.create(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription),
147+
new FutureCallback<HttpResponse>() {
148+
149+
@Override
150+
public void completed(HttpResponse result) {
151+
observer.onCompleted();
152+
}
153+
154+
@Override
155+
public void failed(Exception ex) {
156+
observer.onError(ex);
157+
}
158+
159+
@Override
160+
public void cancelled() {
161+
observer.onCompleted();
162+
}
163+
164+
})));
165+
166+
return parentSubscription;
167+
}
168+
});
169+
}
170+
171+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.apache.http;
17+
18+
import org.apache.http.HttpResponse;
19+
20+
import rx.Observable;
21+
22+
/**
23+
* The {@link HttpResponse} for the entire request and accessor to {@link Observable} of the content stream.
24+
*/
25+
public class ObservableHttpResponse {
26+
27+
private final HttpResponse response;
28+
private final Observable<byte[]> contentSubscription;
29+
30+
public ObservableHttpResponse(HttpResponse response, Observable<byte[]> contentSubscription) {
31+
this.response = response;
32+
this.contentSubscription = contentSubscription;
33+
}
34+
35+
/**
36+
* The {@link HttpResponse} returned by the Apache client at the beginning of the response.
37+
*
38+
* @return {@link HttpResponse} with HTTP status codes, headers, etc
39+
*/
40+
public HttpResponse getResponse() {
41+
return response;
42+
}
43+
44+
/**
45+
* If the response is not chunked then only a single array will be returned. If chunked then multiple arrays.
46+
*/
47+
public Observable<byte[]> getContent() {
48+
return contentSubscription;
49+
}
50+
51+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.apache.http.consumers;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
21+
import org.apache.http.nio.util.ExpandableBuffer;
22+
import org.apache.http.nio.util.HeapByteBufferAllocator;
23+
24+
class ExpandableByteBuffer extends ExpandableBuffer {
25+
public ExpandableByteBuffer(int size) {
26+
super(size, HeapByteBufferAllocator.INSTANCE);
27+
}
28+
29+
public ExpandableByteBuffer() {
30+
super(4 * 1024, HeapByteBufferAllocator.INSTANCE);
31+
}
32+
33+
public void addByte(byte b) {
34+
if (this.buffer.remaining() == 0) {
35+
expand();
36+
}
37+
this.buffer.put(b);
38+
}
39+
40+
public boolean hasContent() {
41+
return this.buffer.position() > 0;
42+
}
43+
44+
public byte[] getBytes() {
45+
byte[] data = new byte[this.buffer.position()];
46+
this.buffer.position(0);
47+
this.buffer.get(data);
48+
return data;
49+
}
50+
51+
public void reset() {
52+
clear();
53+
}
54+
55+
public void consumeInputStream(InputStream content) throws IOException {
56+
try {
57+
int b = -1;
58+
while ((b = content.read()) != -1) {
59+
addByte((byte) b);
60+
}
61+
} finally {
62+
content.close();
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)