19
19
import java .util .List ;
20
20
import java .util .Queue ;
21
21
import java .util .concurrent .CancellationException ;
22
+ import java .util .concurrent .CompletableFuture ;
22
23
import java .util .concurrent .ConcurrentLinkedQueue ;
23
24
import java .util .concurrent .ScheduledExecutorService ;
24
25
import java .util .concurrent .atomic .AtomicBoolean ;
@@ -38,7 +39,7 @@ public class ReceiveQueueBuffer {
38
39
private final QueueAttributesManager queueAttributesManager ;
39
40
40
41
private final Queue <ReceiveSqsMessageHelper > finishedTasks = new ConcurrentLinkedQueue <>();
41
- private final Queue <ReceiveMessageCompletableFuture > futures = new ConcurrentLinkedQueue <>();
42
+ private final Queue <FutureRequestWrapper > futures = new ConcurrentLinkedQueue <>();
42
43
43
44
private final AtomicInteger inflightReceiveMessageBatches = new AtomicInteger (0 );
44
45
private final AtomicBoolean shutDown = new AtomicBoolean (false );
@@ -55,8 +56,8 @@ public ReceiveQueueBuffer(ScheduledExecutorService executor, SqsAsyncClient sqsC
55
56
this .queueAttributesManager = queueAttributesManager ;
56
57
}
57
58
58
- public void receiveMessage (ReceiveMessageCompletableFuture receiveMessageFuture ) {
59
- futures .add (receiveMessageFuture );
59
+ public void receiveMessage (CompletableFuture < ReceiveMessageResponse > receiveMessageFuture , int numMessages ) {
60
+ futures .add (new FutureRequestWrapper ( receiveMessageFuture , numMessages ) );
60
61
satisfyFuturesFromBuffer ();
61
62
spawnMoreReceiveTasks ();
62
63
}
@@ -79,16 +80,15 @@ public void shutdown() {
79
80
}
80
81
81
82
// Clear futures
82
- futures .forEach (future -> {
83
- if (!future . responseCompletableFuture ().isDone ()) {
84
- future . setFailure (new CancellationException ("Shutdown in progress" ));
83
+ futures .forEach (futureWrapper -> {
84
+ if (!futureWrapper . getFuture ().isDone ()) {
85
+ futureWrapper . getFuture (). completeExceptionally (new CancellationException ("Shutdown in progress" ));
85
86
}
86
87
});
87
88
futures .clear ();
88
89
}
89
90
}
90
91
91
-
92
92
private void spawnMoreReceiveTasks () {
93
93
if (shutDown .get ()) {
94
94
return ;
@@ -121,7 +121,7 @@ private int determineDesiredBatches() {
121
121
122
122
if (config .adaptivePrefetching ()) {
123
123
int totalRequested = futures .stream ()
124
- .mapToInt (ReceiveMessageCompletableFuture :: requestedSize )
124
+ .mapToInt (FutureRequestWrapper :: getRequestedSize )
125
125
.sum ();
126
126
int batchesNeededToFulfillFutures = (int ) Math .ceil ((float ) totalRequested / config .maxBatchItems ());
127
127
desiredBatches = Math .min (batchesNeededToFulfillFutures , desiredBatches );
@@ -130,19 +130,19 @@ private int determineDesiredBatches() {
130
130
return desiredBatches ;
131
131
}
132
132
133
- private void fulfillFuture (ReceiveMessageCompletableFuture future , ReceiveSqsMessageHelper messageHelper ) {
133
+ private void fulfillFuture (FutureRequestWrapper futureWrapper , ReceiveSqsMessageHelper messageHelper ) {
134
134
List <Message > messages = new LinkedList <>();
135
135
Throwable exception = messageHelper .getException ();
136
136
int numRetrieved = 0 ;
137
137
boolean batchDone = false ;
138
138
139
139
if (exception != null ) {
140
- future . setFailure (exception );
140
+ futureWrapper . getFuture (). completeExceptionally (exception );
141
141
finishedTasks .poll ();
142
142
return ;
143
143
}
144
144
145
- while (numRetrieved < future . requestedSize ()) {
145
+ while (numRetrieved < futureWrapper . getRequestedSize ()) {
146
146
Message msg = messageHelper .removeMessage ();
147
147
if (msg != null ) {
148
148
messages .add (msg );
@@ -156,7 +156,7 @@ private void fulfillFuture(ReceiveMessageCompletableFuture future, ReceiveSqsMes
156
156
if (batchDone ) {
157
157
finishedTasks .poll ();
158
158
}
159
- future . setSuccess (ReceiveMessageResponse .builder ().messages (messages ).build ());
159
+ futureWrapper . getFuture (). complete (ReceiveMessageResponse .builder ().messages (messages ).build ());
160
160
}
161
161
162
162
private void satisfyFuturesFromBuffer () {
@@ -177,7 +177,10 @@ private void satisfyFuturesFromBuffer() {
177
177
}
178
178
179
179
private void pruneExpiredTasks () {
180
- futures .removeIf (ReceiveMessageCompletableFuture ::isExpired );
180
+ futures .removeIf (futureWrapper -> {
181
+ CompletableFuture <ReceiveMessageResponse > future = futureWrapper .getFuture ();
182
+ return future .isDone ();
183
+ });
181
184
}
182
185
183
186
private void reportBatchFinished (ReceiveSqsMessageHelper batch ) {
@@ -186,4 +189,23 @@ private void reportBatchFinished(ReceiveSqsMessageHelper batch) {
186
189
satisfyFuturesFromBuffer ();
187
190
spawnMoreReceiveTasks ();
188
191
}
192
+
193
+
194
+ private static class FutureRequestWrapper {
195
+ private final CompletableFuture <ReceiveMessageResponse > future ;
196
+ private final int requestedSize ;
197
+
198
+ FutureRequestWrapper (CompletableFuture <ReceiveMessageResponse > future , int requestedSize ) {
199
+ this .future = future ;
200
+ this .requestedSize = requestedSize ;
201
+ }
202
+
203
+ public CompletableFuture <ReceiveMessageResponse > getFuture () {
204
+ return future ;
205
+ }
206
+
207
+ public int getRequestedSize () {
208
+ return requestedSize ;
209
+ }
210
+ }
189
211
}
0 commit comments