60
60
import java .util .concurrent .ExecutionException ;
61
61
import java .util .concurrent .Executors ;
62
62
import java .util .concurrent .TimeUnit ;
63
- import java .util .concurrent .atomic .AtomicInteger ;
64
63
import java .util .concurrent .atomic .AtomicReference ;
65
64
import java .util .logging .Level ;
66
65
import java .util .logging .Logger ;
@@ -73,9 +72,9 @@ public final class XdsTestClient {
73
72
private final Set <XdsStatsWatcher > watchers = new HashSet <>();
74
73
private final Object lock = new Object ();
75
74
private final List <ManagedChannel > channels = new ArrayList <>();
76
- private final AtomicInteger rpcsStarted = new AtomicInteger ();
77
- private final AtomicInteger rpcsFailed = new AtomicInteger ();
78
- private final AtomicInteger rpcsSucceeded = new AtomicInteger ();
75
+ private final Map < String , Integer > rpcsStartedByMethod = new HashMap <> ();
76
+ private final Map < String , Integer > rpcsFailedByMethod = new HashMap <> ();
77
+ private final Map < String , Integer > rpcsSucceededByMethod = new HashMap <> ();
79
78
80
79
private int numChannels = 1 ;
81
80
private boolean printResponse = false ;
@@ -333,7 +332,6 @@ public void onError(Throwable t) {
333
332
@ Override
334
333
public void onNext (EmptyProtos .Empty response ) {}
335
334
});
336
- rpcsStarted .getAndIncrement ();
337
335
} else if (rpcType == RpcType .UNARY_CALL ) {
338
336
SimpleRequest request = SimpleRequest .newBuilder ().setFillServerId (true ).build ();
339
337
stub .unaryCall (
@@ -373,19 +371,39 @@ public void onNext(SimpleResponse response) {
373
371
}
374
372
}
375
373
});
376
- rpcsStarted .getAndIncrement ();
374
+ } else {
375
+ throw new AssertionError ("Unknown RPC type: " + rpcType );
376
+ }
377
+ synchronized (lock ) {
378
+ Integer startedBase = rpcsStartedByMethod .get (rpcType .name ());
379
+ if (startedBase == null ) {
380
+ startedBase = 0 ;
381
+ }
382
+ rpcsStartedByMethod .put (rpcType .name (), startedBase + 1 );
377
383
}
378
384
}
379
385
380
386
private void handleRpcCompleted (long requestId , RpcType rpcType , String hostname ,
381
387
Set <XdsStatsWatcher > watchers ) {
382
- rpcsSucceeded .getAndIncrement ();
388
+ synchronized (lock ) {
389
+ Integer succeededBase = rpcsSucceededByMethod .get (rpcType .name ());
390
+ if (succeededBase == null ) {
391
+ succeededBase = 0 ;
392
+ }
393
+ rpcsSucceededByMethod .put (rpcType .name (), succeededBase + 1 );
394
+ }
383
395
notifyWatchers (watchers , rpcType , requestId , hostname );
384
396
}
385
397
386
398
private void handleRpcError (long requestId , RpcType rpcType , String hostname ,
387
399
Set <XdsStatsWatcher > watchers ) {
388
- rpcsFailed .getAndIncrement ();
400
+ synchronized (lock ) {
401
+ Integer failedBase = rpcsFailedByMethod .get (rpcType .name ());
402
+ if (failedBase == null ) {
403
+ failedBase = 0 ;
404
+ }
405
+ rpcsFailedByMethod .put (rpcType .name (), failedBase + 1 );
406
+ }
389
407
notifyWatchers (watchers , rpcType , requestId , hostname );
390
408
}
391
409
}
@@ -457,12 +475,15 @@ public void getClientStats(
457
475
@ Override
458
476
public void getClientAccumulatedStats (LoadBalancerAccumulatedStatsRequest request ,
459
477
StreamObserver <LoadBalancerAccumulatedStatsResponse > responseObserver ) {
460
- responseObserver .onNext (
461
- LoadBalancerAccumulatedStatsResponse .newBuilder ()
462
- .setNumRpcsStarted (rpcsStarted .get ())
463
- .setNumRpcsSucceeded (rpcsSucceeded .get ())
464
- .setNumRpcsFailed (rpcsFailed .get ())
465
- .build ());
478
+ LoadBalancerAccumulatedStatsResponse .Builder responseBuilder =
479
+ LoadBalancerAccumulatedStatsResponse .newBuilder ();
480
+ synchronized (lock ) {
481
+ responseBuilder
482
+ .putAllNumRpcsStartedByMethod (rpcsStartedByMethod )
483
+ .putAllNumRpcsSucceededByMethod (rpcsSucceededByMethod )
484
+ .putAllNumRpcsFailedByMethod (rpcsFailedByMethod );
485
+ }
486
+ responseObserver .onNext (responseBuilder .build ());
466
487
responseObserver .onCompleted ();
467
488
}
468
489
}
0 commit comments