Skip to content

Commit e63a9f9

Browse files
committed
resizing refactored
RequestRegistry added locks
1 parent 771b1d3 commit e63a9f9

File tree

2 files changed

+42
-27
lines changed

2 files changed

+42
-27
lines changed

src/main/java/com/swiftmq/tools/collection/OrderedSet.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,22 @@ public OrderedSet(int max) {
3131

3232
public void increaseSize(int extend) {
3333
this.max += extend;
34+
System.out.println("increase size by " + extend + " to " + max);
3435
}
3536

3637
public void decreaseSize(int reduce, int minSize) {
3738
this.max = Math.max(minSize, this.max - reduce);
38-
this.resize(this.max);
39+
this.reduceToSIze(this.max);
3940
}
4041

4142
public void resize(int newSize) {
43+
if (newSize > max)
44+
max = newSize;
45+
else if (newSize < max)
46+
reduceToSIze(newSize);
47+
}
48+
49+
private void reduceToSIze(int newSize) {
4250
Iterator<Object> iterator = set.iterator();
4351
while (set.size() > newSize && iterator.hasNext()) {
4452
iterator.next();

src/main/java/com/swiftmq/tools/requestreply/RequestRegistry.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.security.PrivilegedAction;
2828
import java.util.Set;
2929
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3031
import java.util.concurrent.locks.ReentrantReadWriteLock;
3132

3233
public class RequestRegistry implements TimerListener {
@@ -36,9 +37,9 @@ public class RequestRegistry implements TimerListener {
3637
static boolean wrapPrivileged = false;
3738
ConcurrentExpandableList<Request> requestList = new ConcurrentExpandableList<>();
3839
RequestHandler requestHandler = null;
39-
boolean valid = true;
40-
boolean paused = false;
41-
volatile boolean requestTimeoutEnabled = true;
40+
final AtomicBoolean valid = new AtomicBoolean(true);
41+
final AtomicBoolean paused = new AtomicBoolean(false);
42+
final AtomicBoolean requestTimeoutEnabled = new AtomicBoolean(true);
4243
Semaphore retrySem = null;
4344
Set<Request> retrySet = ConcurrentHashMap.newKeySet();
4445
String debugString = null;
@@ -56,15 +57,15 @@ public static void setWrapPrivileged(boolean wrapPrivileged) {
5657
}
5758

5859
public void setRequestTimeoutEnabled(boolean requestTimeoutEnabled) {
59-
this.requestTimeoutEnabled = requestTimeoutEnabled;
60+
this.requestTimeoutEnabled.set(requestTimeoutEnabled);
6061
if (requestTimeoutEnabled)
6162
TimerRegistry.Singleton().addTimerListener(TIMEOUT_CHECKINTERVAL, this);
6263
}
6364

6465
public void setPaused(boolean paused) {
6566
lock.writeLock().lock();
6667
try {
67-
this.paused = paused;
68+
this.paused.set(paused);
6869
} finally {
6970
lock.writeLock().unlock();
7071
}
@@ -137,24 +138,30 @@ public Reply request(Request req) {
137138
}
138139

139140
private void processRequest(Request req) {
140-
if (!valid)
141-
throw new RuntimeException("Invalid request (connection might be closed already)");
142-
143-
req.setReply(null);
144-
req.setDoRetry(false);
145-
if (requestTimeoutEnabled)
146-
req.setTimeout(System.currentTimeMillis() + SWIFTMQ_REQUEST_TIMEOUT);
147-
148-
// find next free index or add request to the end of the list
149-
req.setRequestNumber(requestList.add(req));
150-
151-
// perform request via request handler
152-
if (!paused)
153-
requestHandler.performRequest(req);
154-
else {
155-
if (DEBUG) System.out.println(debugString + ": Paused, request NOT sent: " + req);
141+
lock.writeLock().lock();
142+
try {
143+
if (!valid.get())
144+
throw new RuntimeException("Invalid request (connection might be closed already)");
145+
146+
req.setReply(null);
147+
req.setDoRetry(false);
148+
if (requestTimeoutEnabled.get())
149+
req.setTimeout(System.currentTimeMillis() + SWIFTMQ_REQUEST_TIMEOUT);
150+
151+
// find next free index or add request to the end of the list
152+
req.setRequestNumber(requestList.add(req));
153+
154+
// perform request via request handler
155+
if (!paused.get())
156+
requestHandler.performRequest(req);
157+
else {
158+
if (DEBUG) System.out.println(debugString + ": Paused, request NOT sent: " + req);
159+
}
160+
} finally {
161+
lock.writeLock().unlock();
156162
}
157163

164+
158165
}
159166

160167
private Semaphore setReplySynchronized(Reply reply) {
@@ -214,7 +221,7 @@ public void cancelAllRequests(TransportException exception, boolean valid) {
214221
retrySem.notifySingleWaiter();
215222
retrySem = null;
216223
}
217-
this.valid = valid;
224+
this.valid.set(valid);
218225
} finally {
219226
lock.writeLock().unlock();
220227
}
@@ -290,7 +297,7 @@ public void performTimeAction(TimerEvent evt) {
290297
try {
291298
long actTime = System.currentTimeMillis();
292299
for (int i = 0; i < requestList.size(); i++) {
293-
Request req = (Request) requestList.get(i);
300+
Request req = requestList.get(i);
294301
if (req != null && req.getTimeout() != -1 && req.getTimeout() < actTime) {
295302
requestList.remove(i);
296303
Reply reply = req.createReply();
@@ -308,11 +315,11 @@ public void performTimeAction(TimerEvent evt) {
308315
}
309316

310317
public void close() {
311-
if (requestTimeoutEnabled)
318+
if (requestTimeoutEnabled.get())
312319
TimerRegistry.Singleton().removeTimerListener(TIMEOUT_CHECKINTERVAL, this);
313320
}
314321

315-
private class PrivilegedRequestHandler implements RequestHandler {
322+
private static class PrivilegedRequestHandler implements RequestHandler {
316323
RequestHandler realHandler = null;
317324

318325
public PrivilegedRequestHandler(RequestHandler realHandler) {
@@ -329,7 +336,7 @@ public Object run() {
329336
}
330337
}
331338

332-
private abstract class PrivilegedRequestAction implements PrivilegedAction {
339+
private abstract static class PrivilegedRequestAction implements PrivilegedAction {
333340
Request myRequest = null;
334341

335342
public PrivilegedRequestAction(Request request) {

0 commit comments

Comments
 (0)