Skip to content

Commit a88fd4b

Browse files
committed
Moving demo code to test
1 parent 181a09e commit a88fd4b

File tree

2 files changed

+187
-330
lines changed
  • services/kinesis/src

2 files changed

+187
-330
lines changed
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package software.amazon.awssdk.services.kinesis;
2+
3+
import java.io.UnsupportedEncodingException;
4+
import java.net.URI;
5+
import java.nio.ByteBuffer;
6+
import java.util.Random;
7+
import java.util.UUID;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.Executors;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
import org.apache.log4j.BasicConfigurator;
13+
import org.reactivestreams.Subscriber;
14+
import org.reactivestreams.Subscription;
15+
import software.amazon.awssdk.core.auth.ProfileCredentialsProvider;
16+
import software.amazon.awssdk.core.client.builder.ClientAsyncHttpConfiguration;
17+
import software.amazon.awssdk.core.client.builder.ClientBuilder;
18+
import software.amazon.awssdk.core.flow.FlowPublisher;
19+
import software.amazon.awssdk.core.flow.FlowResponseTransformer;
20+
import software.amazon.awssdk.core.flow.ResponseIterator;
21+
import software.amazon.awssdk.core.regions.Region;
22+
import software.amazon.awssdk.http.nio.netty.NettySdkHttpClientFactory;
23+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
24+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
25+
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
26+
import software.amazon.awssdk.services.kinesis.model.RecordBatchEvent;
27+
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
28+
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
29+
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
30+
31+
public class H2Demo {
32+
33+
private static final String ALPHA_STREAM_NAME = "foobar";
34+
private static final String DEVPERF_STREAM_NAME = "prashray-50";
35+
36+
public static final int COUNT = 500_000;
37+
public static final int INTERVAL = 10;
38+
private static final Random random = new Random();
39+
40+
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
41+
BasicConfigurator.configure();
42+
KinesisAsyncClient client = alpha(
43+
KinesisAsyncClient
44+
.builder()
45+
.asyncHttpConfiguration(ClientAsyncHttpConfiguration
46+
.builder()
47+
.httpClientFactory(NettySdkHttpClientFactory.builder()
48+
.trustAllCertificates(true)
49+
.maxConnectionsPerEndpoint(10)
50+
.build())
51+
.build())
52+
).build();
53+
54+
// ExecutorService recordProducer = startProducer();
55+
long start = System.nanoTime();
56+
ResponseIterator<SubscribeToShardResponse, RecordBatchEvent> iterator =
57+
client.subscribeToShardBlocking(SubscribeToShardRequest.builder()
58+
.consumerARN(ALPHA_STREAM_NAME)
59+
.shardId("shardId-000000000000")
60+
.shardIteratorType(ShardIteratorType.LATEST)
61+
.streamName(ALPHA_STREAM_NAME)
62+
.build());
63+
iterator.forEachRemaining(System.out::println);
64+
// subscribeToShardResponseHandler(client).join();
65+
try {
66+
System.out.println("Total time = " + (System.nanoTime() - start));
67+
} catch (Exception e) {
68+
System.out.println("Closing client");
69+
client.close();
70+
}
71+
System.out.println("Closing client");
72+
client.close();
73+
// recordProducer.shutdownNow();
74+
}
75+
76+
private static ExecutorService startProducer() {
77+
KinesisAsyncClient client = alpha(
78+
KinesisAsyncClient
79+
.builder()
80+
.asyncHttpConfiguration(ClientAsyncHttpConfiguration
81+
.builder()
82+
.httpClientFactory(NettySdkHttpClientFactory.builder()
83+
.trustAllCertificates(true)
84+
.maxConnectionsPerEndpoint(10)
85+
.build())
86+
.build())
87+
).build();
88+
ExecutorService recordProducer = Executors.newSingleThreadExecutor();
89+
recordProducer.submit(() -> {
90+
while (true) {
91+
client.putRecord(PutRecordRequest.builder()
92+
.streamName(ALPHA_STREAM_NAME)
93+
.partitionKey(UUID.randomUUID().toString())
94+
.data(ByteBuffer.wrap(randomBytes(100)))
95+
.build())
96+
.join();
97+
try {
98+
Thread.sleep(100);
99+
} catch (InterruptedException e) {
100+
Thread.currentThread().interrupt();
101+
throw new RuntimeException(e);
102+
} catch (Exception e) {
103+
e.printStackTrace();
104+
}
105+
}
106+
});
107+
return recordProducer;
108+
}
109+
110+
private static CompletableFuture<Integer> subscribeToShardResponseHandler(KinesisAsyncClient client) {
111+
return client.subscribeToShard(SubscribeToShardRequest.builder()
112+
.consumerARN(ALPHA_STREAM_NAME)
113+
.shardId("shardId-000000000000")
114+
.shardIteratorType(ShardIteratorType.LATEST)
115+
.streamName(ALPHA_STREAM_NAME)
116+
.build(),
117+
new FlowResponseTransformer<SubscribeToShardResponse, RecordBatchEvent, Integer>() {
118+
AtomicInteger count = new AtomicInteger(0);
119+
120+
@Override
121+
public void responseReceived(SubscribeToShardResponse response) {
122+
System.out.println("Initial Response = " + response);
123+
}
124+
125+
@Override
126+
public void onStream(FlowPublisher<RecordBatchEvent> p) {
127+
p.subscribe(new Subscriber<RecordBatchEvent>() {
128+
@Override
129+
public void onSubscribe(Subscription subscription) {
130+
subscription.request(Long.MAX_VALUE);
131+
}
132+
133+
@Override
134+
public void onNext(RecordBatchEvent recordBatchEvent) {
135+
count.incrementAndGet();
136+
System.out.println("RECORDS = " + recordBatchEvent);
137+
}
138+
139+
@Override
140+
public void onError(Throwable throwable) {
141+
142+
}
143+
144+
@Override
145+
public void onComplete() {
146+
147+
}
148+
});
149+
150+
}
151+
152+
@Override
153+
public void exceptionOccurred(Throwable throwable) {
154+
155+
}
156+
157+
@Override
158+
public Integer complete() {
159+
return count.get();
160+
}
161+
});
162+
}
163+
164+
private static byte[] randomBytes(int numBytes) {
165+
byte[] bytes = new byte[numBytes];
166+
random.nextBytes(bytes);
167+
return bytes;
168+
}
169+
170+
private static <T extends ClientBuilder<?, ?>> T prod(T builder) {
171+
return (T) builder.region(Region.US_EAST_1)
172+
.credentialsProvider(ProfileCredentialsProvider.create("personal"));
173+
}
174+
175+
private static <T extends ClientBuilder<?, ?>> T alpha(T builder) {
176+
return (T) builder.endpointOverride(URI.create("https://aws-kinesis-alpha.corp.amazon.com"))
177+
.region(Region.US_EAST_1)
178+
.credentialsProvider(ProfileCredentialsProvider.create("kinesis-alpha"));
179+
}
180+
181+
private static KinesisAsyncClientBuilder devPerf(KinesisAsyncClientBuilder builder) {
182+
return builder.endpointOverride(URI.create("https://kinesis-devperf2.us-east-1.amazon.com"))
183+
.region(Region.US_EAST_1)
184+
.credentialsProvider(ProfileCredentialsProvider.create("kinesis-dev-perf"));
185+
}
186+
187+
}

0 commit comments

Comments
 (0)