33
33
import java .util .concurrent .ScheduledExecutorService ;
34
34
import java .util .concurrent .ScheduledFuture ;
35
35
import java .util .concurrent .TimeUnit ;
36
- import java .util .concurrent .atomic .AtomicBoolean ;
37
36
38
37
public class PersistentConnectionImpl implements Connection .Delegate , PersistentConnection {
39
38
@@ -114,17 +113,17 @@ public String toString() {
114
113
}
115
114
116
115
private static class OutstandingGet {
117
- private Map <String , Object > request ;
118
- private ConnectionRequestCallback onComplete ;
119
- private String action ;
120
- private AtomicBoolean sent ;
116
+ private final Map <String , Object > request ;
117
+ private final ConnectionRequestCallback onComplete ;
118
+ private final String action ;
119
+ private boolean sent ;
121
120
122
121
private OutstandingGet (
123
122
String action , Map <String , Object > request , ConnectionRequestCallback onComplete ) {
124
123
this .action = action ;
125
124
this .request = request ;
126
125
this .onComplete = onComplete ;
127
- this .sent = new AtomicBoolean ( false ) ;
126
+ this .sent = false ;
128
127
}
129
128
130
129
private String getAction () {
@@ -140,11 +139,12 @@ private Map<String, Object> getRequest() {
140
139
}
141
140
142
141
private boolean markSent () {
143
- return sent .compareAndSet (false , true );
144
- }
145
-
146
- private boolean wasSent () {
147
- return sent .get ();
142
+ boolean prev = sent ;
143
+ if (prev ) {
144
+ return false ;
145
+ }
146
+ this .sent = true ;
147
+ return true ;
148
148
}
149
149
}
150
150
@@ -399,8 +399,7 @@ public Task<Object> get(List<String> path, Map<String, Object> queryParams) {
399
399
request .put (REQUEST_PATH , ConnectionUtils .pathToString (query .path ));
400
400
request .put (REQUEST_QUERIES , query .queryParams );
401
401
402
- outstandingGets .put (
403
- readId ,
402
+ OutstandingGet outstandingGet =
404
403
new OutstandingGet (
405
404
REQUEST_ACTION_GET ,
406
405
request ,
@@ -417,15 +416,16 @@ public void onResponse(Map<String, Object> response) {
417
416
new Exception ((String ) response .get (SERVER_DATA_UPDATE_BODY )));
418
417
}
419
418
}
420
- }));
419
+ });
420
+ outstandingGets .put (readId , outstandingGet );
421
421
422
422
if (!connected ()) {
423
423
executorService .schedule (
424
424
new Runnable () {
425
425
@ Override
426
426
public void run () {
427
427
OutstandingGet get = outstandingGets .get (readId );
428
- if (get == null || !get .markSent ()) {
428
+ if (get == null || get != outstandingGet || !get .markSent ()) {
429
429
return ;
430
430
}
431
431
if (logger .logsDebug ()) {
@@ -1067,13 +1067,6 @@ private void restoreState() {
1067
1067
sendPut (put );
1068
1068
}
1069
1069
1070
- if (logger .logsDebug ()) logger .debug ("Restoring reads." );
1071
- ArrayList <Long > outstandingGetKeys = new ArrayList <Long >(outstandingGets .keySet ());
1072
- Collections .sort (outstandingGetKeys );
1073
- for (Long getId : outstandingGetKeys ) {
1074
- sendGet (getId );
1075
- }
1076
-
1077
1070
// Restore disconnect operations
1078
1071
for (OutstandingDisconnect disconnect : onDisconnectRequestQueue ) {
1079
1072
sendOnDisconnect (
@@ -1083,6 +1076,13 @@ private void restoreState() {
1083
1076
disconnect .getOnComplete ());
1084
1077
}
1085
1078
onDisconnectRequestQueue .clear ();
1079
+
1080
+ if (logger .logsDebug ()) logger .debug ("Restoring reads." );
1081
+ ArrayList <Long > outstandingGetKeys = new ArrayList <Long >(outstandingGets .keySet ());
1082
+ Collections .sort (outstandingGetKeys );
1083
+ for (Long getId : outstandingGetKeys ) {
1084
+ sendGet (getId );
1085
+ }
1086
1086
}
1087
1087
1088
1088
private void handleTimestamp (long timestamp ) {
@@ -1167,7 +1167,7 @@ private void sendGet(final Long readId) {
1167
1167
OutstandingGet get = outstandingGets .get (readId );
1168
1168
if (!get .markSent ()) {
1169
1169
if (logger .logsDebug ()) {
1170
- logger .debug ("get" + readId + " already sent or cancelled, ignoring." );
1170
+ logger .debug ("get" + readId + " cancelled, ignoring." );
1171
1171
return ;
1172
1172
}
1173
1173
}
@@ -1181,10 +1181,9 @@ public void onResponse(Map<String, Object> response) {
1181
1181
if (currentGet == get ) {
1182
1182
outstandingGets .remove (readId );
1183
1183
get .getOnComplete ().onResponse (response );
1184
- } else {
1185
- if (logger .logsDebug ())
1186
- logger .debug (
1187
- "Ignoring on complete for get " + readId + " because it was removed already." );
1184
+ } else if (logger .logsDebug ()) {
1185
+ logger .debug (
1186
+ "Ignoring on complete for get " + readId + " because it was removed already." );
1188
1187
}
1189
1188
}
1190
1189
});
0 commit comments