29
29
import java .util .Iterator ;
30
30
import java .util .List ;
31
31
import java .util .Map ;
32
+ import java .util .concurrent .ConcurrentHashMap ;
32
33
import java .util .concurrent .ScheduledExecutorService ;
33
34
import java .util .concurrent .ScheduledFuture ;
34
35
import java .util .concurrent .TimeUnit ;
36
+ import java .util .concurrent .atomic .AtomicBoolean ;
35
37
36
38
public class PersistentConnectionImpl implements Connection .Delegate , PersistentConnection {
37
39
@@ -111,6 +113,41 @@ public String toString() {
111
113
}
112
114
}
113
115
116
+ private static class OutstandingGet {
117
+ private Map <String , Object > request ;
118
+ private ConnectionRequestCallback onComplete ;
119
+ private String action ;
120
+ private AtomicBoolean sent ;
121
+
122
+ private OutstandingGet (
123
+ String action , Map <String , Object > request , ConnectionRequestCallback onComplete ) {
124
+ this .action = action ;
125
+ this .request = request ;
126
+ this .onComplete = onComplete ;
127
+ this .sent = new AtomicBoolean (false );
128
+ }
129
+
130
+ private String getAction () {
131
+ return action ;
132
+ }
133
+
134
+ private ConnectionRequestCallback getOnComplete () {
135
+ return onComplete ;
136
+ }
137
+
138
+ private Map <String , Object > getRequest () {
139
+ return request ;
140
+ }
141
+
142
+ private boolean markSent () {
143
+ return sent .compareAndSet (false , true );
144
+ }
145
+
146
+ private boolean wasSent () {
147
+ return sent .get ();
148
+ }
149
+ }
150
+
114
151
private static class OutstandingPut {
115
152
private String action ;
116
153
private Map <String , Object > request ;
@@ -234,6 +271,7 @@ private enum ConnectionState {
234
271
private static final long SUCCESSFUL_CONNECTION_ESTABLISHED_DELAY = 30 * 1000 ;
235
272
236
273
private static final long IDLE_TIMEOUT = 60 * 1000 ;
274
+ private static final long GET_CONNECT_TIMEOUT = 3 * 1000 ;
237
275
238
276
/** If auth fails repeatedly, we'll assume something is wrong and log a warning / back off. */
239
277
private static final long INVALID_AUTH_TOKEN_THRESHOLD = 3 ;
@@ -253,11 +291,13 @@ private enum ConnectionState {
253
291
private Connection realtime ;
254
292
private ConnectionState connectionState = ConnectionState .Disconnected ;
255
293
private long writeCounter = 0 ;
294
+ private long readCounter = 0 ;
256
295
private long requestCounter = 0 ;
257
296
private Map <Long , ConnectionRequestCallback > requestCBHash ;
258
297
259
298
private List <OutstandingDisconnect > onDisconnectRequestQueue ;
260
299
private Map <Long , OutstandingPut > outstandingPuts ;
300
+ private Map <Long , OutstandingGet > outstandingGets ;
261
301
262
302
private Map <QuerySpec , OutstandingListen > listens ;
263
303
private String authToken ;
@@ -287,6 +327,7 @@ public PersistentConnectionImpl(
287
327
this .listens = new HashMap <QuerySpec , OutstandingListen >();
288
328
this .requestCBHash = new HashMap <Long , ConnectionRequestCallback >();
289
329
this .outstandingPuts = new HashMap <Long , OutstandingPut >();
330
+ this .outstandingGets = new ConcurrentHashMap <Long , OutstandingGet >();
290
331
this .onDisconnectRequestQueue = new ArrayList <OutstandingDisconnect >();
291
332
this .retryHelper =
292
333
new RetryHelper .Builder (this .executorService , context .getLogger (), "ConnectionRetryHelper" )
@@ -351,15 +392,58 @@ public void listen(
351
392
public Task <Object > get (List <String > path , Map <String , Object > queryParams ) {
352
393
QuerySpec query = new QuerySpec (path , queryParams );
353
394
TaskCompletionSource <Object > source = new TaskCompletionSource <>();
354
- Task <Object > task ;
355
- if (connected ()) {
356
- task = sendGet (query , source );
357
- } else {
358
- source .setException (new Exception ("Client is offline" ));
359
- task = source .getTask ();
395
+
396
+ long readId = this .readCounter ++;
397
+
398
+ Map <String , Object > request = new HashMap <String , Object >();
399
+ request .put (REQUEST_PATH , ConnectionUtils .pathToString (query .path ));
400
+ request .put (REQUEST_QUERIES , query .queryParams );
401
+
402
+ outstandingGets .put (
403
+ readId ,
404
+ new OutstandingGet (
405
+ REQUEST_ACTION_GET ,
406
+ request ,
407
+ new ConnectionRequestCallback () {
408
+ @ Override
409
+ public void onResponse (Map <String , Object > response ) {
410
+ String status = (String ) response .get (REQUEST_STATUS );
411
+ if (status .equals ("ok" )) {
412
+ Object body = response .get (SERVER_DATA_UPDATE_BODY );
413
+ delegate .onDataUpdate (query .path , body , /*isMerge=*/ false , /*tagNumber=*/ null );
414
+ source .setResult (body );
415
+ } else {
416
+ source .setException (
417
+ new Exception ((String ) response .get (SERVER_DATA_UPDATE_BODY )));
418
+ }
419
+ }
420
+ }));
421
+
422
+ if (!connected ()) {
423
+ executorService .schedule (
424
+ new Runnable () {
425
+ @ Override
426
+ public void run () {
427
+ OutstandingGet get = outstandingGets .get (readId );
428
+ if (get == null || !get .markSent ()) {
429
+ return ;
430
+ }
431
+ if (logger .logsDebug ()) {
432
+ logger .debug ("get " + readId + " timed out waiting for connection" );
433
+ }
434
+ outstandingGets .remove (readId );
435
+ source .setException (new Exception ("Client is offline" ));
436
+ }
437
+ },
438
+ GET_CONNECT_TIMEOUT ,
439
+ TimeUnit .MILLISECONDS );
440
+ }
441
+
442
+ if (canSendReads ()) {
443
+ sendGet (readId );
360
444
}
361
445
doIdleCheck ();
362
- return task ;
446
+ return source . getTask () ;
363
447
}
364
448
365
449
@ Override
@@ -508,6 +592,10 @@ private boolean canSendWrites() {
508
592
return connectionState == ConnectionState .Connected ;
509
593
}
510
594
595
+ private boolean canSendReads () {
596
+ return connectionState == ConnectionState .Connected ;
597
+ }
598
+
511
599
@ Override
512
600
public void onDisconnectMerge (
513
601
List <String > path , Map <String , Object > updates , final RequestResultCallback onComplete ) {
@@ -979,6 +1067,13 @@ private void restoreState() {
979
1067
sendPut (put );
980
1068
}
981
1069
1070
+ if (logger .logsDebug ()) logger .debug ("Restoring reads." );
1071
+ ArrayList <Long > outstandingGetKeys = new ArrayList <Long >(outstandingGets .keySet ());
1072
+ Collections .sort (outstandingGetKeys );
1073
+ for (Long getId : outstandingGetKeys ) {
1074
+ sendGet (getId );
1075
+ }
1076
+
982
1077
// Restore disconnect operations
983
1078
for (OutstandingDisconnect disconnect : onDisconnectRequestQueue ) {
984
1079
sendOnDisconnect (
@@ -1067,27 +1162,32 @@ public void onResponse(Map<String, Object> response) {
1067
1162
});
1068
1163
}
1069
1164
1070
- private Task <Object > sendGet (final QuerySpec query , TaskCompletionSource <Object > source ) {
1071
- Map <String , Object > request = new HashMap <String , Object >();
1072
- request .put (REQUEST_PATH , ConnectionUtils .pathToString (query .path ));
1073
- request .put (REQUEST_QUERIES , query .queryParams );
1165
+ private void sendGet (final Long readId ) {
1166
+ hardAssert (canSendReads (), "sendGet called when we can't send gets" );
1167
+ OutstandingGet get = outstandingGets .get (readId );
1168
+ if (!get .markSent ()) {
1169
+ if (logger .logsDebug ()) {
1170
+ logger .debug ("get" + readId + " already sent or cancelled, ignoring." );
1171
+ return ;
1172
+ }
1173
+ }
1074
1174
sendAction (
1075
- REQUEST_ACTION_GET ,
1076
- request ,
1175
+ get . getAction () ,
1176
+ get . getRequest () ,
1077
1177
new ConnectionRequestCallback () {
1078
1178
@ Override
1079
1179
public void onResponse (Map <String , Object > response ) {
1080
- String status = (String ) response .get (REQUEST_STATUS );
1081
- if (status .equals ("ok" )) {
1082
- Object body = response .get (SERVER_DATA_UPDATE_BODY );
1083
- delegate .onDataUpdate (query .path , body , /*isMerge=*/ false , /*tagNumber=*/ null );
1084
- source .setResult (body );
1180
+ OutstandingGet currentGet = outstandingGets .get (readId );
1181
+ if (currentGet == get ) {
1182
+ outstandingGets .remove (readId );
1183
+ get .getOnComplete ().onResponse (response );
1085
1184
} else {
1086
- source .setException (new Exception ((String ) response .get (SERVER_DATA_UPDATE_BODY )));
1185
+ if (logger .logsDebug ())
1186
+ logger .debug (
1187
+ "Ignoring on complete for get " + readId + " because it was removed already." );
1087
1188
}
1088
1189
}
1089
1190
});
1090
- return source .getTask ();
1091
1191
}
1092
1192
1093
1193
private void sendListen (final OutstandingListen listen ) {
0 commit comments