25
25
import java .util .function .BiConsumer ;
26
26
import java .util .stream .Collectors ;
27
27
import software .amazon .awssdk .annotations .SdkInternalApi ;
28
+ import software .amazon .awssdk .utils .Logger ;
28
29
29
30
@ SdkInternalApi
30
31
public final class BatchBuffer <RequestT , ResponseT > {
31
32
private final Object flushLock = new Object ();
32
33
33
34
private final Map <String , BatchingExecutionContext <RequestT , ResponseT >> idToBatchContext ;
34
35
36
+ // TODO: Figure out better name for nextId and nextBatchEntry.
35
37
/**
36
38
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
37
39
* BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and
38
40
* response pair is received.
39
41
*/
40
- private final AtomicInteger nextId ;
42
+ private int nextId ;
41
43
42
44
/**
43
45
* Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a
44
46
* request that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added
45
47
* to idToBatchContext had an id of 22, nextBatchEntry will have a value of 23).
46
48
*/
47
- private final AtomicInteger nextBatchEntry ;
49
+ private int nextBatchEntry ;
48
50
49
51
/**
50
52
* The scheduled flush tasks associated with this batchBuffer.
@@ -53,8 +55,8 @@ public final class BatchBuffer<RequestT, ResponseT> {
53
55
54
56
public BatchBuffer (ScheduledFuture <?> scheduledFlush ) {
55
57
this .idToBatchContext = new ConcurrentHashMap <>();
56
- this .nextId = new AtomicInteger ( 0 ) ;
57
- this .nextBatchEntry = new AtomicInteger ( 0 ) ;
58
+ this .nextId = 0 ;
59
+ this .nextBatchEntry = 0 ;
58
60
this .scheduledFlush = scheduledFlush ;
59
61
}
60
62
@@ -79,7 +81,8 @@ public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableSched
79
81
private Map <String , BatchingExecutionContext <RequestT , ResponseT >> extractFlushedEntries (int maxBatchItems ) {
80
82
LinkedHashMap <String , BatchingExecutionContext <RequestT , ResponseT >> requestEntries = new LinkedHashMap <>();
81
83
String nextEntry ;
82
- while (requestEntries .size () < maxBatchItems && (nextEntry = nextBatchEntry ()) != null ) {
84
+ while (requestEntries .size () < maxBatchItems && hasNextBatchEntry ()) {
85
+ nextEntry = nextBatchEntry ();
83
86
requestEntries .put (nextEntry , idToBatchContext .get (nextEntry ));
84
87
idToBatchContext .remove (nextEntry );
85
88
}
@@ -94,30 +97,25 @@ public CompletableFuture<ResponseT> getResponse(String key) {
94
97
return idToBatchContext .get (key ).response ();
95
98
}
96
99
97
- // TODO: Needs to be in a lock to maintain insertion order. Not sure if there is any other way to accomplish this. I tried to
98
- // do this in a do while loop but it still ended up resulting in an incorrect insertion order.
99
100
public BatchingExecutionContext <RequestT , ResponseT > put (RequestT request , CompletableFuture <ResponseT > response ) {
100
101
synchronized (this ) {
101
- String id = BatchUtils .getAndIncrementId (nextId );
102
+ if (nextId == Integer .MAX_VALUE ) {
103
+ nextId = 0 ;
104
+ }
105
+ String id = Integer .toString (nextId ++);
102
106
return idToBatchContext .put (id , new BatchingExecutionContext <>(request , response ));
103
107
}
104
108
}
105
109
106
- private String nextBatchEntry () {
107
- int currentNextBatchEntry ;
108
- int newNextBatchEntry ;
109
- do {
110
- currentNextBatchEntry = nextBatchEntry .get ();
111
- newNextBatchEntry = currentNextBatchEntry + 1 ;
112
- if (!idToBatchContext .containsKey (Integer .toString (currentNextBatchEntry ))) {
113
- newNextBatchEntry = currentNextBatchEntry ;
114
- }
115
- } while (!nextBatchEntry .compareAndSet (currentNextBatchEntry , newNextBatchEntry ));
110
+ private boolean hasNextBatchEntry () {
111
+ return idToBatchContext .containsKey (Integer .toString (nextBatchEntry ));
112
+ }
116
113
117
- if (currentNextBatchEntry != newNextBatchEntry ) {
118
- return Integer .toString (currentNextBatchEntry );
114
+ private String nextBatchEntry () {
115
+ if (nextBatchEntry == Integer .MAX_VALUE ) {
116
+ nextBatchEntry = 0 ;
119
117
}
120
- return null ;
118
+ return Integer . toString ( nextBatchEntry ++) ;
121
119
}
122
120
123
121
public void putScheduledFlush (ScheduledFuture <?> scheduledFlush ) {
@@ -138,9 +136,4 @@ public Collection<CompletableFuture<ResponseT>> responses() {
138
136
public void clear () {
139
137
idToBatchContext .clear ();
140
138
}
141
-
142
- // TODO: Only for debugging
143
- public void forEach (BiConsumer <String , BatchingExecutionContext <RequestT , ResponseT >> action ) {
144
- idToBatchContext .forEach (action );
145
- }
146
139
}
0 commit comments