61
61
import java .util .concurrent .ExecutionException ;
62
62
import java .util .concurrent .Executors ;
63
63
import java .util .concurrent .TimeUnit ;
64
- import java .util .concurrent .atomic .AtomicInteger ;
65
64
import java .util .concurrent .atomic .AtomicReference ;
66
65
import java .util .logging .Level ;
67
66
import java .util .logging .Logger ;
@@ -74,9 +73,9 @@ public final class XdsTestClient {
74
73
private final Set <XdsStatsWatcher > watchers = new HashSet <>();
75
74
private final Object lock = new Object ();
76
75
private final List <ManagedChannel > channels = new ArrayList <>();
77
- private final AtomicInteger rpcsStarted = new AtomicInteger ();
78
- private final AtomicInteger rpcsFailed = new AtomicInteger ();
79
- private final AtomicInteger rpcsSucceeded = new AtomicInteger ();
76
+ private final Map < String , Integer > rpcsStartedByMethod = new HashMap <> ();
77
+ private final Map < String , Integer > rpcsFailedByMethod = new HashMap <> ();
78
+ private final Map < String , Integer > rpcsSucceededByMethod = new HashMap <> ();
80
79
81
80
private int numChannels = 1 ;
82
81
private boolean printResponse = false ;
@@ -345,7 +344,6 @@ public void onError(Throwable t) {
345
344
@ Override
346
345
public void onNext (EmptyProtos .Empty response ) {}
347
346
});
348
- rpcsStarted .getAndIncrement ();
349
347
} else if (rpcType == RpcType .UNARY_CALL ) {
350
348
SimpleRequest request = SimpleRequest .newBuilder ().setFillServerId (true ).build ();
351
349
stub .unaryCall (
@@ -385,19 +383,39 @@ public void onNext(SimpleResponse response) {
385
383
}
386
384
}
387
385
});
388
- rpcsStarted .getAndIncrement ();
386
+ } else {
387
+ throw new AssertionError ("Unknown RPC type: " + rpcType );
388
+ }
389
+ synchronized (lock ) {
390
+ Integer startedBase = rpcsStartedByMethod .get (rpcType .name ());
391
+ if (startedBase == null ) {
392
+ startedBase = 0 ;
393
+ }
394
+ rpcsStartedByMethod .put (rpcType .name (), startedBase + 1 );
389
395
}
390
396
}
391
397
392
398
private void handleRpcCompleted (long requestId , RpcType rpcType , String hostname ,
393
399
Set <XdsStatsWatcher > watchers ) {
394
- rpcsSucceeded .getAndIncrement ();
400
+ synchronized (lock ) {
401
+ Integer succeededBase = rpcsSucceededByMethod .get (rpcType .name ());
402
+ if (succeededBase == null ) {
403
+ succeededBase = 0 ;
404
+ }
405
+ rpcsSucceededByMethod .put (rpcType .name (), succeededBase + 1 );
406
+ }
395
407
notifyWatchers (watchers , rpcType , requestId , hostname );
396
408
}
397
409
398
410
private void handleRpcError (long requestId , RpcType rpcType , String hostname ,
399
411
Set <XdsStatsWatcher > watchers ) {
400
- rpcsFailed .getAndIncrement ();
412
+ synchronized (lock ) {
413
+ Integer failedBase = rpcsFailedByMethod .get (rpcType .name ());
414
+ if (failedBase == null ) {
415
+ failedBase = 0 ;
416
+ }
417
+ rpcsFailedByMethod .put (rpcType .name (), failedBase + 1 );
418
+ }
401
419
notifyWatchers (watchers , rpcType , requestId , hostname );
402
420
}
403
421
}
@@ -469,12 +487,15 @@ public void getClientStats(
469
487
@ Override
470
488
public void getClientAccumulatedStats (LoadBalancerAccumulatedStatsRequest request ,
471
489
StreamObserver <LoadBalancerAccumulatedStatsResponse > responseObserver ) {
472
- responseObserver .onNext (
473
- LoadBalancerAccumulatedStatsResponse .newBuilder ()
474
- .setNumRpcsStarted (rpcsStarted .get ())
475
- .setNumRpcsSucceeded (rpcsSucceeded .get ())
476
- .setNumRpcsFailed (rpcsFailed .get ())
477
- .build ());
490
+ LoadBalancerAccumulatedStatsResponse .Builder responseBuilder =
491
+ LoadBalancerAccumulatedStatsResponse .newBuilder ();
492
+ synchronized (lock ) {
493
+ responseBuilder
494
+ .putAllNumRpcsStartedByMethod (rpcsStartedByMethod )
495
+ .putAllNumRpcsSucceededByMethod (rpcsSucceededByMethod )
496
+ .putAllNumRpcsFailedByMethod (rpcsFailedByMethod );
497
+ }
498
+ responseObserver .onNext (responseBuilder .build ());
478
499
responseObserver .onCompleted ();
479
500
}
480
501
}
0 commit comments