6
6
import java .util .Random ;
7
7
import java .util .UUID ;
8
8
import java .util .concurrent .CompletableFuture ;
9
+ import java .util .concurrent .CountDownLatch ;
9
10
import java .util .concurrent .ExecutorService ;
10
11
import java .util .concurrent .Executors ;
12
+ import java .util .concurrent .TimeUnit ;
11
13
import java .util .concurrent .atomic .AtomicInteger ;
12
14
import org .apache .log4j .BasicConfigurator ;
13
15
import org .reactivestreams .Subscriber ;
20
22
import software .amazon .awssdk .core .flow .ResponseIterator ;
21
23
import software .amazon .awssdk .core .regions .Region ;
22
24
import software .amazon .awssdk .http .nio .netty .NettySdkHttpClientFactory ;
23
- import software .amazon .awssdk .services .kinesis .KinesisAsyncClient ;
24
- import software .amazon .awssdk .services .kinesis .KinesisAsyncClientBuilder ;
25
+ import software .amazon .awssdk .services .kinesis .model .DescribeStreamConsumerRequest ;
25
26
import software .amazon .awssdk .services .kinesis .model .PutRecordRequest ;
26
27
import software .amazon .awssdk .services .kinesis .model .RecordBatchEvent ;
28
+ import software .amazon .awssdk .services .kinesis .model .RegisterStreamConsumerResponse ;
27
29
import software .amazon .awssdk .services .kinesis .model .ShardIteratorType ;
28
30
import software .amazon .awssdk .services .kinesis .model .SubscribeToShardRequest ;
29
31
import software .amazon .awssdk .services .kinesis .model .SubscribeToShardResponse ;
30
32
31
33
public class H2Demo {
32
34
33
- private static final String ALPHA_STREAM_NAME = "foobar" ;
34
- private static final String DEVPERF_STREAM_NAME = "prashray-50 " ;
35
+ private static final String STREAM_NAME = "foobar" ;
36
+ private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:052958737983:stream/foobar/consumer/consumer1:123456 " ;
35
37
36
38
public static final int COUNT = 500_000 ;
37
39
public static final int INTERVAL = 10 ;
38
40
private static final Random random = new Random ();
39
41
40
42
public static void main (String [] args ) throws InterruptedException , UnsupportedEncodingException {
41
43
BasicConfigurator .configure ();
44
+ // KinesisAsyncClient client = alpha(
45
+ // KinesisAsyncClient
46
+ // .builder()
47
+ // .asyncHttpConfiguration(ClientAsyncHttpConfiguration
48
+ // .builder()
49
+ // .httpClientFactory(NettySdkHttpClientFactory.builder()
50
+ // .trustAllCertificates(true)
51
+ // .build())
52
+ // .build())
53
+ // ).build();
54
+
55
+ int numSubscribers = 1 ;
56
+ CountDownLatch latch = new CountDownLatch (numSubscribers );
42
57
KinesisAsyncClient client = alpha (
43
58
KinesisAsyncClient
44
59
.builder ()
45
60
.asyncHttpConfiguration (ClientAsyncHttpConfiguration
46
61
.builder ()
47
62
.httpClientFactory (NettySdkHttpClientFactory .builder ()
48
63
.trustAllCertificates (true )
49
- .maxConnectionsPerEndpoint (10 )
50
64
.build ())
51
65
.build ())
52
66
).build ();
67
+ String streamArn = client .describeStream (r -> r .streamName (STREAM_NAME ))
68
+ .join ().streamDescription ().streamARN ();
69
+ client .describeStreamConsumer (DescribeStreamConsumerRequest .builder ()
70
+ .consumerName ("java-sdk-consumer" )
71
+ .streamARN (streamArn )
72
+ .build ());
73
+ if (true ) {
74
+ System .exit (0 );
75
+ }
76
+ // String streamArn = client.describeStream(r -> r.streamName(STREAM_NAME)).join().streamDescription().streamARN();
77
+ String consumerArn = client .describeStreamConsumer (r -> r .streamARN (streamArn )
78
+ .consumerName ("shorea-consumer" ))
79
+ .join ().consumerDescription ().consumerARN ();
80
+
81
+ ExecutorService recordProducer = startProducer (client );
82
+ ExecutorService subscriberExecutor = Executors .newFixedThreadPool (numSubscribers );
83
+ for (int i = 1 ; i <= numSubscribers ; i ++) {
84
+ int streamNum = i ;
85
+ subscriberExecutor .submit (() -> {
86
+ try {
87
+ subscribeToShardResponseHandler (client , "Stream-" + streamNum , consumerArn ).join ();
88
+
89
+ // ResponseIterator<SubscribeToShardResponse, RecordBatchEvent> iterator =
90
+ // client.subscribeToShardBlocking(SubscribeToShardRequest.builder()
91
+ // .consumerARN(STREAM_NAME)
92
+ // .shardId("shardId-000000000000")
93
+ // .shardIteratorType(ShardIteratorType.LATEST)
94
+ // .consumerARN(consumerArn)
95
+ // .build());
96
+ // System.out.println("Has iterator");
97
+ // iterator.forEachRemaining(System.out::println);
98
+ System .out .println ("Finished processing for stream " + streamNum );
99
+ System .out .println ("Closing client" );
100
+ } catch (Exception e ) {
101
+ e .printStackTrace ();
102
+ System .out .println ("Stream " + streamNum + " failed" );
103
+ } finally {
104
+ latch .countDown ();
105
+ }
106
+
107
+ });
108
+ }
109
+ latch .await ();
110
+ System .out .println ("Closing client" );
111
+ client .close ();
112
+ System .out .println ("All subscribers finished" );
113
+ subscriberExecutor .shutdown ();
114
+ subscriberExecutor .awaitTermination (1000 , TimeUnit .SECONDS );
53
115
54
- // ExecutorService recordProducer = startProducer();
55
116
long start = System .nanoTime ();
56
- ResponseIterator <SubscribeToShardResponse , RecordBatchEvent > iterator =
57
- client .subscribeToShardBlocking (SubscribeToShardRequest .builder ()
58
- .consumerARN (ALPHA_STREAM_NAME )
59
- .shardId ("shardId-000000000000" )
60
- .shardIteratorType (ShardIteratorType .LATEST )
61
- .streamName (ALPHA_STREAM_NAME )
62
- .build ());
63
- iterator .forEachRemaining (System .out ::println );
64
- // subscribeToShardResponseHandler(client).join();
117
+ // ResponseIterator<SubscribeToShardResponse, RecordBatchEvent> iterator =
118
+ // client.subscribeToShardBlocking(SubscribeToShardRequest.builder()
119
+ // .consumerARN(ALPHA_STREAM_NAME)
120
+ // .shardId("shardId-000000000000")
121
+ // .shardIteratorType(ShardIteratorType.LATEST)
122
+ // .streamName(ALPHA_STREAM_NAME)
123
+ // .build());
124
+ // iterator.forEachRemaining(System.out::println);
65
125
try {
66
126
System .out .println ("Total time = " + (System .nanoTime () - start ));
67
127
} catch (Exception e ) {
68
128
System .out .println ("Closing client" );
69
- client .close ();
70
129
}
71
- System .out .println ("Closing client" );
72
- client .close ();
73
- // recordProducer.shutdownNow();
130
+ recordProducer .shutdownNow ();
74
131
}
75
132
76
- private static ExecutorService startProducer () {
77
- KinesisAsyncClient client = alpha (
78
- KinesisAsyncClient
79
- .builder ()
80
- .asyncHttpConfiguration (ClientAsyncHttpConfiguration
81
- .builder ()
82
- .httpClientFactory (NettySdkHttpClientFactory .builder ()
83
- .trustAllCertificates (true )
84
- .maxConnectionsPerEndpoint (10 )
85
- .build ())
86
- .build ())
87
- ).build ();
133
+ private static ExecutorService startProducer (KinesisAsyncClient client ) {
88
134
ExecutorService recordProducer = Executors .newSingleThreadExecutor ();
89
135
recordProducer .submit (() -> {
90
136
while (true ) {
137
+ System .out .println ("Putting record" );
91
138
client .putRecord (PutRecordRequest .builder ()
92
- .streamName (ALPHA_STREAM_NAME )
139
+ .streamName (STREAM_NAME )
93
140
.partitionKey (UUID .randomUUID ().toString ())
94
- .data (ByteBuffer .wrap (randomBytes (100 )))
141
+ .data (ByteBuffer .wrap (randomBytes (1000 * 100 )))
95
142
.build ())
96
143
.join ();
97
144
try {
@@ -107,19 +154,19 @@ private static ExecutorService startProducer() {
107
154
return recordProducer ;
108
155
}
109
156
110
- private static CompletableFuture <Integer > subscribeToShardResponseHandler (KinesisAsyncClient client ) {
157
+ private static CompletableFuture <Integer > subscribeToShardResponseHandler (KinesisAsyncClient client , String prefix , String consumerArn ) {
111
158
return client .subscribeToShard (SubscribeToShardRequest .builder ()
112
- .consumerARN (ALPHA_STREAM_NAME )
159
+ .consumerARN (CONSUMER_ARN )
113
160
.shardId ("shardId-000000000000" )
114
161
.shardIteratorType (ShardIteratorType .LATEST )
115
- .streamName ( ALPHA_STREAM_NAME )
162
+ .consumerARN ( consumerArn )
116
163
.build (),
117
164
new FlowResponseTransformer <SubscribeToShardResponse , RecordBatchEvent , Integer >() {
118
165
AtomicInteger count = new AtomicInteger (0 );
119
166
120
167
@ Override
121
168
public void responseReceived (SubscribeToShardResponse response ) {
122
- System .out .println (" Initial Response = " + response );
169
+ System .out .println (prefix + ": Initial Response = " + response );
123
170
}
124
171
125
172
@ Override
@@ -133,7 +180,7 @@ public void onSubscribe(Subscription subscription) {
133
180
@ Override
134
181
public void onNext (RecordBatchEvent recordBatchEvent ) {
135
182
count .incrementAndGet ();
136
- System .out .println ("RECORDS = " + recordBatchEvent );
183
+ // System.out.println(prefix + ": Records = " + recordBatchEvent);
137
184
}
138
185
139
186
@ Override
@@ -184,4 +231,10 @@ private static KinesisAsyncClientBuilder devPerf(KinesisAsyncClientBuilder build
184
231
.credentialsProvider (ProfileCredentialsProvider .create ("kinesis-dev-perf" ));
185
232
}
186
233
234
+ private static KinesisAsyncClientBuilder hailstone (KinesisAsyncClientBuilder builder ) {
235
+ return builder .endpointOverride (URI .create ("https://kinesis-hailstoneperf.pdx.amazon.com" ))
236
+ .region (Region .US_EAST_1 )
237
+ .credentialsProvider (ProfileCredentialsProvider .create ("kinesis-hailstone" ));
238
+ }
239
+
187
240
}
0 commit comments