@@ -179,9 +179,11 @@ public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exceptio
179
179
AsyncTransactionManager manager = client ().transactionManagerAsync ();
180
180
TransactionContext txn = manager .beginAsync ().get ();
181
181
txn .executeUpdateAsync (UPDATE_STATEMENT ).get ();
182
- final TransactionSelector selector =
183
- ((TransactionContextImpl ) ((SessionPoolTransactionContext ) txn ).delegate )
184
- .getTransactionSelector ();
182
+ if (txn instanceof SessionPoolTransactionContext ) {
183
+ txn = ((SessionPoolTransactionContext ) txn ).delegate ;
184
+ }
185
+ TransactionContextImpl impl = (TransactionContextImpl ) txn ;
186
+ final TransactionSelector selector = impl .getTransactionSelector ();
185
187
186
188
SpannerApiFutures .get (manager .closeAsync ());
187
189
// The mock server should already have the Rollback request, as we are waiting for the returned
@@ -359,7 +361,22 @@ public void asyncTransactionManagerFireAndForgetInvalidUpdate() throws Exception
359
361
ExecuteSqlRequest .class ,
360
362
ExecuteSqlRequest .class ,
361
363
CommitRequest .class );
362
- if (isMultiplexedSessionsEnabled ()) {
364
+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionForRW =
365
+ ImmutableList .of (
366
+ CreateSessionRequest .class ,
367
+ // The first update that fails. This will cause a transaction retry.
368
+ ExecuteSqlRequest .class ,
369
+ // The retry will use an explicit BeginTransaction call.
370
+ BeginTransactionRequest .class ,
371
+ // The first update will again fail, but now there is a transaction id, so the
372
+ // transaction can continue.
373
+ ExecuteSqlRequest .class ,
374
+ ExecuteSqlRequest .class ,
375
+ CommitRequest .class );
376
+ if (isMultiplexedSessionsEnabledForRW ()) {
377
+ assertThat (mockSpanner .getRequestTypes ())
378
+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionForRW );
379
+ } else if (isMultiplexedSessionsEnabled ()) {
363
380
assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
364
381
} else {
365
382
assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -502,14 +519,25 @@ public void asyncTransactionManagerUpdateAbortedWithoutGettingResult() throws Ex
502
519
// The server may receive 1 or 2 commit requests depending on whether the call to
503
520
// commitAsync() already knows that the transaction has aborted. If it does, it will not
504
521
// attempt to call the Commit RPC and instead directly propagate the Aborted error.
505
- assertThat (mockSpanner .getRequestTypes ())
506
- .containsAtLeast (
507
- BatchCreateSessionsRequest .class ,
508
- ExecuteSqlRequest .class ,
509
- // The retry will use a BeginTransaction RPC.
510
- BeginTransactionRequest .class ,
511
- ExecuteSqlRequest .class ,
512
- CommitRequest .class );
522
+ if (isMultiplexedSessionsEnabledForRW ()) {
523
+ assertThat (mockSpanner .getRequestTypes ())
524
+ .containsAtLeast (
525
+ CreateSessionRequest .class ,
526
+ ExecuteSqlRequest .class ,
527
+ // The retry will use a BeginTransaction RPC.
528
+ BeginTransactionRequest .class ,
529
+ ExecuteSqlRequest .class ,
530
+ CommitRequest .class );
531
+ } else {
532
+ assertThat (mockSpanner .getRequestTypes ())
533
+ .containsAtLeast (
534
+ BatchCreateSessionsRequest .class ,
535
+ ExecuteSqlRequest .class ,
536
+ // The retry will use a BeginTransaction RPC.
537
+ BeginTransactionRequest .class ,
538
+ ExecuteSqlRequest .class ,
539
+ CommitRequest .class );
540
+ }
513
541
break ;
514
542
} catch (AbortedException e ) {
515
543
transactionContextFuture = manager .resetForRetryAsync ();
@@ -557,7 +585,11 @@ public void asyncTransactionManagerWaitsUntilAsyncUpdateHasFinished() throws Exc
557
585
executor )
558
586
.commitAsync ()
559
587
.get ();
560
- if (isMultiplexedSessionsEnabled ()) {
588
+ if (isMultiplexedSessionsEnabledForRW ()) {
589
+ assertThat (mockSpanner .getRequestTypes ())
590
+ .containsExactly (
591
+ CreateSessionRequest .class , ExecuteSqlRequest .class , CommitRequest .class );
592
+ } else if (isMultiplexedSessionsEnabled ()) {
561
593
assertThat (mockSpanner .getRequestTypes ())
562
594
.containsExactly (
563
595
CreateSessionRequest .class ,
@@ -678,7 +710,16 @@ public void asyncTransactionManagerFireAndForgetInvalidBatchUpdate() throws Exce
678
710
ExecuteBatchDmlRequest .class ,
679
711
ExecuteBatchDmlRequest .class ,
680
712
CommitRequest .class );
681
- if (isMultiplexedSessionsEnabled ()) {
713
+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
714
+ ImmutableList .of (
715
+ CreateSessionRequest .class ,
716
+ ExecuteBatchDmlRequest .class ,
717
+ ExecuteBatchDmlRequest .class ,
718
+ CommitRequest .class );
719
+ if (isMultiplexedSessionsEnabledForRW ()) {
720
+ assertThat (mockSpanner .getRequestTypes ())
721
+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
722
+ } else if (isMultiplexedSessionsEnabled ()) {
682
723
assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
683
724
} else {
684
725
assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -722,7 +763,17 @@ public void asyncTransactionManagerBatchUpdateAborted() throws Exception {
722
763
BeginTransactionRequest .class ,
723
764
ExecuteBatchDmlRequest .class ,
724
765
CommitRequest .class );
725
- if (isMultiplexedSessionsEnabled ()) {
766
+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
767
+ ImmutableList .of (
768
+ CreateSessionRequest .class ,
769
+ ExecuteBatchDmlRequest .class ,
770
+ BeginTransactionRequest .class ,
771
+ ExecuteBatchDmlRequest .class ,
772
+ CommitRequest .class );
773
+ if (isMultiplexedSessionsEnabledForRW ()) {
774
+ assertThat (mockSpanner .getRequestTypes ())
775
+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
776
+ } else if (isMultiplexedSessionsEnabled ()) {
726
777
assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
727
778
} else {
728
779
assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -764,7 +815,20 @@ public void asyncTransactionManagerBatchUpdateAbortedBeforeFirstStatement() thro
764
815
BeginTransactionRequest .class ,
765
816
ExecuteBatchDmlRequest .class ,
766
817
CommitRequest .class );
767
- if (isMultiplexedSessionsEnabled ()) {
818
+ // When requests run using multiplexed session with read-write enabled, the
819
+ // BatchCreateSessionsRequest will not be
820
+ // triggered because we are creating an empty pool during initialization.
821
+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
822
+ ImmutableList .of (
823
+ CreateSessionRequest .class ,
824
+ ExecuteBatchDmlRequest .class ,
825
+ BeginTransactionRequest .class ,
826
+ ExecuteBatchDmlRequest .class ,
827
+ CommitRequest .class );
828
+ if (isMultiplexedSessionsEnabledForRW ()) {
829
+ assertThat (mockSpanner .getRequestTypes ())
830
+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
831
+ } else if (isMultiplexedSessionsEnabled ()) {
768
832
assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
769
833
} else {
770
834
assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -825,7 +889,18 @@ public void asyncTransactionManagerWithBatchUpdateCommitAborted() throws Excepti
825
889
BeginTransactionRequest .class ,
826
890
ExecuteBatchDmlRequest .class ,
827
891
CommitRequest .class );
828
- if (isMultiplexedSessionsEnabled ()) {
892
+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
893
+ ImmutableList .of (
894
+ CreateSessionRequest .class ,
895
+ ExecuteBatchDmlRequest .class ,
896
+ CommitRequest .class ,
897
+ BeginTransactionRequest .class ,
898
+ ExecuteBatchDmlRequest .class ,
899
+ CommitRequest .class );
900
+ if (isMultiplexedSessionsEnabledForRW ()) {
901
+ assertThat (mockSpanner .getRequestTypes ())
902
+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
903
+ } else if (isMultiplexedSessionsEnabled ()) {
829
904
assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
830
905
} else {
831
906
assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -867,27 +942,50 @@ public void asyncTransactionManagerBatchUpdateAbortedWithoutGettingResult() thro
867
942
assertThat (attempt .get ()).isEqualTo (2 );
868
943
List <Class <? extends AbstractMessage >> requests = mockSpanner .getRequestTypes ();
869
944
// Remove the CreateSession requests for multiplexed sessions, as those are not relevant for
870
- // this test.
871
- requests .removeIf (request -> request == CreateSessionRequest .class );
945
+ // this test if multiplexed session for read-write is not enabled.
946
+ if (!isMultiplexedSessionsEnabledForRW ()) {
947
+ requests .removeIf (request -> request == CreateSessionRequest .class );
948
+ }
872
949
int size = Iterables .size (requests );
873
950
assertThat (size ).isIn (Range .closed (5 , 6 ));
874
951
if (size == 5 ) {
875
- assertThat (requests )
876
- .containsExactly (
877
- BatchCreateSessionsRequest .class ,
878
- ExecuteBatchDmlRequest .class ,
879
- BeginTransactionRequest .class ,
880
- ExecuteBatchDmlRequest .class ,
881
- CommitRequest .class );
952
+ if (isMultiplexedSessionsEnabledForRW ()) {
953
+ assertThat (requests )
954
+ .containsExactly (
955
+ CreateSessionRequest .class ,
956
+ ExecuteBatchDmlRequest .class ,
957
+ BeginTransactionRequest .class ,
958
+ ExecuteBatchDmlRequest .class ,
959
+ CommitRequest .class );
960
+ } else {
961
+ assertThat (requests )
962
+ .containsExactly (
963
+ BatchCreateSessionsRequest .class ,
964
+ ExecuteBatchDmlRequest .class ,
965
+ BeginTransactionRequest .class ,
966
+ ExecuteBatchDmlRequest .class ,
967
+ CommitRequest .class );
968
+ }
882
969
} else {
883
- assertThat (requests )
884
- .containsExactly (
885
- BatchCreateSessionsRequest .class ,
886
- ExecuteBatchDmlRequest .class ,
887
- CommitRequest .class ,
888
- BeginTransactionRequest .class ,
889
- ExecuteBatchDmlRequest .class ,
890
- CommitRequest .class );
970
+ if (isMultiplexedSessionsEnabledForRW ()) {
971
+ assertThat (requests )
972
+ .containsExactly (
973
+ CreateSessionRequest .class ,
974
+ ExecuteBatchDmlRequest .class ,
975
+ CommitRequest .class ,
976
+ BeginTransactionRequest .class ,
977
+ ExecuteBatchDmlRequest .class ,
978
+ CommitRequest .class );
979
+ } else {
980
+ assertThat (requests )
981
+ .containsExactly (
982
+ BatchCreateSessionsRequest .class ,
983
+ ExecuteBatchDmlRequest .class ,
984
+ CommitRequest .class ,
985
+ BeginTransactionRequest .class ,
986
+ ExecuteBatchDmlRequest .class ,
987
+ CommitRequest .class );
988
+ }
891
989
}
892
990
}
893
991
@@ -918,7 +1016,13 @@ public void asyncTransactionManagerWithBatchUpdateCommitFails() {
918
1016
ImmutableList <Class <? extends Message >> expectedRequests =
919
1017
ImmutableList .of (
920
1018
BatchCreateSessionsRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
921
- if (isMultiplexedSessionsEnabled ()) {
1019
+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
1020
+ ImmutableList .of (
1021
+ CreateSessionRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
1022
+ if (isMultiplexedSessionsEnabledForRW ()) {
1023
+ assertThat (mockSpanner .getRequestTypes ())
1024
+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
1025
+ } else if (isMultiplexedSessionsEnabled ()) {
922
1026
assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
923
1027
} else {
924
1028
assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -949,7 +1053,13 @@ public void asyncTransactionManagerWaitsUntilAsyncBatchUpdateHasFinished() throw
949
1053
ImmutableList <Class <? extends Message >> expectedRequests =
950
1054
ImmutableList .of (
951
1055
BatchCreateSessionsRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
952
- if (isMultiplexedSessionsEnabled ()) {
1056
+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
1057
+ ImmutableList .of (
1058
+ CreateSessionRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
1059
+ if (isMultiplexedSessionsEnabledForRW ()) {
1060
+ assertThat (mockSpanner .getRequestTypes ())
1061
+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
1062
+ } else if (isMultiplexedSessionsEnabled ()) {
953
1063
assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
954
1064
} else {
955
1065
assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -1122,4 +1232,11 @@ private boolean isMultiplexedSessionsEnabled() {
1122
1232
}
1123
1233
return spanner .getOptions ().getSessionPoolOptions ().getUseMultiplexedSession ();
1124
1234
}
1235
+
1236
+ private boolean isMultiplexedSessionsEnabledForRW () {
1237
+ if (spanner .getOptions () == null || spanner .getOptions ().getSessionPoolOptions () == null ) {
1238
+ return false ;
1239
+ }
1240
+ return spanner .getOptions ().getSessionPoolOptions ().getUseMultiplexedSessionForRW ();
1241
+ }
1125
1242
}
0 commit comments