Skip to content

Commit 3c8d25c

Browse files
author
Michael Li
committed
Addressing half of the PR comments. Have questions on the rest
1 parent a7965d7 commit 3c8d25c

File tree

6 files changed

+39
-39
lines changed

6 files changed

+39
-39
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/BatchBuffer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,21 +58,21 @@ public BatchBuffer(ScheduledFuture<?> scheduledFlush) {
5858
this.scheduledFlush = scheduledFlush;
5959
}
6060

61-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> canManualFlush(int maxBatchItems) {
61+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(int maxBatchItems) {
6262
synchronized (flushLock) {
6363
if (idToBatchContext.size() >= maxBatchItems) {
6464
return extractFlushedEntries(maxBatchItems);
6565
}
66-
return null;
66+
return new ConcurrentHashMap<>();
6767
}
6868
}
6969

70-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> canScheduledFlush(int maxBatchItems) {
70+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(int maxBatchItems) {
7171
synchronized (flushLock) {
7272
if (idToBatchContext.size() > 0) {
7373
return extractFlushedEntries(maxBatchItems);
7474
}
75-
return null;
75+
return new ConcurrentHashMap<>();
7676
}
7777
}
7878

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/BatchManager.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,21 @@ public CompletableFuture<ResponseT> sendRequest(RequestT request) {
107107
() -> scheduleBufferFlush(batchKey, maxBatchOpenInMs.toMillis(), scheduledExecutor),
108108
request,
109109
response);
110-
111-
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests =
112-
requestsAndResponsesMaps.canManualFlush(batchKey, maxBatchItems);
113-
if (flushableRequests != null) {
114-
manualFlushBuffer(batchKey, flushableRequests);
115-
}
110+
flushBufferIfNeeded(batchKey);
116111
} catch (Exception e) {
117112
response.completeExceptionally(e);
118113
}
119114
return response;
120115
}
121116

117+
private void flushBufferIfNeeded(String batchKey) {
118+
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests =
119+
requestsAndResponsesMaps.flushableRequests(batchKey, maxBatchItems);
120+
if (!flushableRequests.isEmpty()) {
121+
manualFlushBuffer(batchKey, flushableRequests);
122+
}
123+
}
124+
122125
private void manualFlushBuffer(String batchKey,
123126
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests) {
124127
requestsAndResponsesMaps.cancelScheduledFlush(batchKey);
@@ -168,20 +171,21 @@ private ScheduledFuture<?> scheduleBufferFlush(String batchKey, long timeOutInMs
168171
}
169172

170173
private void performScheduledFlush(String batchKey) {
171-
Map<String, BatchingExecutionContext<RequestT, ResponseT>> requests =
172-
requestsAndResponsesMaps.canScheduledFlush(batchKey, maxBatchItems);
173-
if (requests != null) {
174-
flushBuffer(batchKey, requests);
174+
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests =
175+
requestsAndResponsesMaps.flushableScheduledRequests(batchKey, maxBatchItems);
176+
if (!flushableRequests.isEmpty()) {
177+
flushBuffer(batchKey, flushableRequests);
175178
}
176179
}
177180

178181
public void close() {
179182
requestsAndResponsesMaps.forEach((batchKey, batchBuffer) -> {
180183
requestsAndResponsesMaps.cancelScheduledFlush(batchKey);
184+
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests =
185+
requestsAndResponsesMaps.flushableScheduledRequests(batchKey, maxBatchItems);
181186

182-
Map<String, BatchingExecutionContext<RequestT, ResponseT>> requests;
183-
while ((requests = requestsAndResponsesMaps.canScheduledFlush(batchKey, maxBatchItems)) != null) {
184-
flushBuffer(batchKey, requests);
187+
while (!flushableRequests.isEmpty()) {
188+
flushBuffer(batchKey, flushableRequests);
185189
}
186190
});
187191
requestsAndResponsesMaps.waitForFlushesAndClear(log);

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/BatchUtils.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,12 @@ private BatchUtils() {
2525
}
2626

2727
public static String getAndIncrementId(AtomicInteger id) {
28-
int currentId;
29-
int newCurrentId;
30-
do {
31-
currentId = id.get();
32-
newCurrentId = currentId + 1;
33-
if (newCurrentId == Integer.MAX_VALUE) {
34-
newCurrentId = 0;
35-
}
36-
} while (!id.compareAndSet(currentId, newCurrentId));
28+
int currentId = id.get();
29+
int newCurrentId = currentId + 1;
30+
if (newCurrentId == Integer.MAX_VALUE) {
31+
newCurrentId = 0;
32+
}
33+
id.set(newCurrentId);
3734
return Integer.toString(currentId);
3835
}
3936
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/BatchingMap.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ public void forEach(BiConsumer<String, BatchBuffer<RequestT, ResponseT>> action)
5454
batchContextMap.forEach(action);
5555
}
5656

57-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> canManualFlush(String batchKey,
58-
int maxBatchItems) {
59-
return batchContextMap.get(batchKey).canManualFlush(maxBatchItems);
57+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey,
58+
int maxBatchItems) {
59+
return batchContextMap.get(batchKey).flushableRequests(maxBatchItems);
6060
}
6161

62-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> canScheduledFlush(String batchKey,
63-
int maxBatchItems) {
64-
return batchContextMap.get(batchKey).canScheduledFlush(maxBatchItems);
62+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(String batchKey,
63+
int maxBatchItems) {
64+
return batchContextMap.get(batchKey).flushableScheduledRequests(maxBatchItems);
6565
}
6666

6767
public void cancelScheduledFlush(String batchKey) {

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/batchmanager/IdentifiableMessageTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ public void checkIdenticalIdentifiableMessagesAreEqual() {
3636
IdentifiableMessage<String> myRequest1 = new IdentifiableMessage<>(id, request);
3737
IdentifiableMessage<String> myRequest2 = new IdentifiableMessage<>(id, request);
3838
Assert.assertEquals(myRequest1, myRequest2);
39+
Assert.assertEquals(myRequest1.hashCode(), myRequest2.hashCode());
3940
}
4041

4142
@Test
42-
public void identifiableMessageHashCode() {
43-
String id = "id";
44-
String request = "request";
45-
IdentifiableMessage<String> myRequest1 = new IdentifiableMessage<>(id, request);
46-
IdentifiableMessage<String> myRequest2 = new IdentifiableMessage<>(id, request);
47-
Assert.assertEquals(myRequest1.hashCode(), myRequest2.hashCode());
43+
public void checkIdenticalIdentifiableMessagesAreNotEqual() {
44+
IdentifiableMessage<String> myRequest1 = new IdentifiableMessage<>("id1", "request1");
45+
IdentifiableMessage<String> myRequest2 = new IdentifiableMessage<>("id2", "request2");
46+
Assert.assertNotEquals(myRequest1, myRequest2);
47+
Assert.assertNotEquals(myRequest1.hashCode(), myRequest2.hashCode());
4848
}
4949

5050
}

services/sqs/src/it/java/software/amazon/awssdk/services/sqs/BatchManagerSqsIntegrationTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ public void scheduleFiveMessagesWithEachThreadToDifferentLocations() {
159159
checkThreadedResponses(requests, responses, sendRequestFutures);
160160
}
161161

162-
// Sometimes it passes a null identifiedRequests;
163162
BatchAndSend<SendMessageRequest, SendMessageBatchResponse> batchingFunction =
164163
(identifiedRequests, destination) -> {
165164
List<SendMessageBatchRequestEntry> entries = new ArrayList<>(identifiedRequests.size());

0 commit comments

Comments
 (0)