@@ -435,6 +435,9 @@ def test_get_empty(self):
435
435
self .assertTrue (pool ._sessions .empty ())
436
436
437
437
def test_spans_get_empty_pool (self ):
438
+ # This scenario tests a pool that hasn't been filled up
439
+ # and pool.get() acquires from a pool, waiting for a session
440
+ # to become available.
438
441
pool = self ._make_one ()
439
442
database = _Database ("name" )
440
443
session1 = _Session (database )
@@ -490,6 +493,10 @@ def test_spans_get_non_empty_session_exists(self):
490
493
"pool.Get" ,
491
494
attributes = TestBurstyPool .BASE_ATTRIBUTES ,
492
495
)
496
+ self .assertSpanEvents (
497
+ "pool.Get" ,
498
+ ["Acquiring session" , "Waiting for a session to become available" ],
499
+ )
493
500
494
501
def test_get_non_empty_session_expired (self ):
495
502
pool = self ._make_one ()
@@ -888,237 +895,6 @@ def test_spans_get_empty_pool(self):
888
895
self .assertSpanEvents ("pool.Get" , wantEventNames )
889
896
890
897
891
- class TestTransactionPingingPool (OpenTelemetryBase ):
892
- BASE_ATTRIBUTES = {
893
- "db.type" : "spanner" ,
894
- "db.url" : "spanner.googleapis.com" ,
895
- "db.instance" : "name" ,
896
- "net.host.name" : "spanner.googleapis.com" ,
897
- }
898
- enrich_with_otel_scope (BASE_ATTRIBUTES )
899
-
900
- def _getTargetClass (self ):
901
- from google .cloud .spanner_v1 .pool import TransactionPingingPool
902
-
903
- return TransactionPingingPool
904
-
905
- def _make_one (self , * args , ** kwargs ):
906
- return self ._getTargetClass ()(* args , ** kwargs )
907
-
908
- def test_ctor_defaults (self ):
909
- pool = self ._make_one ()
910
- self .assertIsNone (pool ._database )
911
- self .assertEqual (pool .size , 10 )
912
- self .assertEqual (pool .default_timeout , 10 )
913
- self .assertEqual (pool ._delta .seconds , 3000 )
914
- self .assertTrue (pool ._sessions .empty ())
915
- self .assertTrue (pool ._pending_sessions .empty ())
916
- self .assertEqual (pool .labels , {})
917
- self .assertIsNone (pool .database_role )
918
-
919
- def test_ctor_explicit (self ):
920
- labels = {"foo" : "bar" }
921
- database_role = "dummy-role"
922
- pool = self ._make_one (
923
- size = 4 ,
924
- default_timeout = 30 ,
925
- ping_interval = 1800 ,
926
- labels = labels ,
927
- database_role = database_role ,
928
- )
929
- self .assertIsNone (pool ._database )
930
- self .assertEqual (pool .size , 4 )
931
- self .assertEqual (pool .default_timeout , 30 )
932
- self .assertEqual (pool ._delta .seconds , 1800 )
933
- self .assertTrue (pool ._sessions .empty ())
934
- self .assertTrue (pool ._pending_sessions .empty ())
935
- self .assertEqual (pool .labels , labels )
936
- self .assertEqual (pool .database_role , database_role )
937
-
938
- def test_ctor_explicit_w_database_role_in_db (self ):
939
- database_role = "dummy-role"
940
- pool = self ._make_one ()
941
- database = pool ._database = _Database ("name" )
942
- SESSIONS = [_Session (database )] * 10
943
- database ._sessions .extend (SESSIONS )
944
- database ._database_role = database_role
945
- pool .bind (database )
946
- self .assertEqual (pool .database_role , database_role )
947
-
948
- def test_bind (self ):
949
- pool = self ._make_one ()
950
- database = _Database ("name" )
951
- SESSIONS = [_Session (database ) for _ in range (10 )]
952
- database ._sessions .extend (SESSIONS )
953
- pool .bind (database )
954
-
955
- self .assertIs (pool ._database , database )
956
- self .assertEqual (pool .size , 10 )
957
- self .assertEqual (pool .default_timeout , 10 )
958
- self .assertEqual (pool ._delta .seconds , 3000 )
959
- self .assertTrue (pool ._sessions .full ())
960
-
961
- api = database .spanner_api
962
- self .assertEqual (api .batch_create_sessions .call_count , 5 )
963
- for session in SESSIONS :
964
- session .create .assert_not_called ()
965
- txn = session ._transaction
966
- txn .begin .assert_not_called ()
967
-
968
- self .assertTrue (pool ._pending_sessions .empty ())
969
- self .assertNoSpans ()
970
-
971
- def test_bind_w_timestamp_race (self ):
972
- import datetime
973
- from google .cloud ._testing import _Monkey
974
- from google .cloud .spanner_v1 import pool as MUT
975
-
976
- NOW = datetime .datetime .utcnow ()
977
- pool = self ._make_one ()
978
- database = _Database ("name" )
979
- SESSIONS = [_Session (database ) for _ in range (10 )]
980
- database ._sessions .extend (SESSIONS )
981
-
982
- with _Monkey (MUT , _NOW = lambda : NOW ):
983
- pool .bind (database )
984
-
985
- self .assertIs (pool ._database , database )
986
- self .assertEqual (pool .size , 10 )
987
- self .assertEqual (pool .default_timeout , 10 )
988
- self .assertEqual (pool ._delta .seconds , 3000 )
989
- self .assertTrue (pool ._sessions .full ())
990
-
991
- api = database .spanner_api
992
- self .assertEqual (api .batch_create_sessions .call_count , 5 )
993
- for session in SESSIONS :
994
- session .create .assert_not_called ()
995
- txn = session ._transaction
996
- txn .begin .assert_not_called ()
997
-
998
- self .assertTrue (pool ._pending_sessions .empty ())
999
- self .assertNoSpans ()
1000
-
1001
- def test_put_full (self ):
1002
- import queue
1003
-
1004
- pool = self ._make_one (size = 4 )
1005
- database = _Database ("name" )
1006
- SESSIONS = [_Session (database ) for _ in range (4 )]
1007
- database ._sessions .extend (SESSIONS )
1008
- pool .bind (database )
1009
-
1010
- with self .assertRaises (queue .Full ):
1011
- pool .put (_Session (database ))
1012
-
1013
- self .assertTrue (pool ._sessions .full ())
1014
- self .assertNoSpans ()
1015
-
1016
- def test_put_non_full_w_active_txn (self ):
1017
- pool = self ._make_one (size = 1 )
1018
- session_queue = pool ._sessions = _Queue ()
1019
- pending = pool ._pending_sessions = _Queue ()
1020
- database = _Database ("name" )
1021
- session = _Session (database )
1022
- txn = session .transaction ()
1023
-
1024
- pool .put (session )
1025
-
1026
- self .assertEqual (len (session_queue ._items ), 1 )
1027
- _ , queued = session_queue ._items [0 ]
1028
- self .assertIs (queued , session )
1029
-
1030
- self .assertEqual (len (pending ._items ), 0 )
1031
- txn .begin .assert_not_called ()
1032
- self .assertNoSpans ()
1033
-
1034
- def test_put_non_full_w_committed_txn (self ):
1035
- pool = self ._make_one (size = 1 )
1036
- session_queue = pool ._sessions = _Queue ()
1037
- pending = pool ._pending_sessions = _Queue ()
1038
- database = _Database ("name" )
1039
- session = _Session (database )
1040
- committed = session .transaction ()
1041
- committed .committed = True
1042
-
1043
- pool .put (session )
1044
-
1045
- self .assertEqual (len (session_queue ._items ), 0 )
1046
-
1047
- self .assertEqual (len (pending ._items ), 1 )
1048
- self .assertIs (pending ._items [0 ], session )
1049
- self .assertIsNot (session ._transaction , committed )
1050
- session ._transaction .begin .assert_not_called ()
1051
- self .assertNoSpans ()
1052
-
1053
- def test_put_non_full (self ):
1054
- pool = self ._make_one (size = 1 )
1055
- session_queue = pool ._sessions = _Queue ()
1056
- pending = pool ._pending_sessions = _Queue ()
1057
- database = _Database ("name" )
1058
- session = _Session (database )
1059
-
1060
- pool .put (session )
1061
-
1062
- self .assertEqual (len (session_queue ._items ), 0 )
1063
- self .assertEqual (len (pending ._items ), 1 )
1064
- self .assertIs (pending ._items [0 ], session )
1065
-
1066
- self .assertFalse (pending .empty ())
1067
- self .assertNoSpans ()
1068
-
1069
- def test_begin_pending_transactions_empty (self ):
1070
- pool = self ._make_one (size = 1 )
1071
- pool .begin_pending_transactions () # no raise
1072
- self .assertNoSpans ()
1073
-
1074
- def test_begin_pending_transactions_non_empty (self ):
1075
- pool = self ._make_one (size = 1 )
1076
- pool ._sessions = _Queue ()
1077
-
1078
- database = _Database ("name" )
1079
- TRANSACTIONS = [_make_transaction (object ())]
1080
- PENDING_SESSIONS = [_Session (database , transaction = txn ) for txn in TRANSACTIONS ]
1081
-
1082
- pending = pool ._pending_sessions = _Queue (* PENDING_SESSIONS )
1083
- self .assertFalse (pending .empty ())
1084
-
1085
- pool .begin_pending_transactions () # no raise
1086
-
1087
- for txn in TRANSACTIONS :
1088
- txn .begin .assert_not_called ()
1089
-
1090
- self .assertTrue (pending .empty ())
1091
- self .assertNoSpans ()
1092
-
1093
- def test_spans_get_empty_pool (self ):
1094
- pool = self ._make_one ()
1095
- database = _Database ("name" )
1096
- session1 = _Session (database )
1097
- database ._sessions .append (session1 )
1098
- try :
1099
- pool .bind (database )
1100
- except Exception :
1101
- pass
1102
-
1103
- with trace_call ("pool.Get" , session1 ):
1104
- try :
1105
- pool .get ()
1106
- except Exception :
1107
- pass
1108
-
1109
- self .assertTrue (pool ._sessions .empty ())
1110
-
1111
- self .assertSpanAttributes (
1112
- "pool.Get" ,
1113
- attributes = TestTransactionPingingPool .BASE_ATTRIBUTES ,
1114
- )
1115
- wantEventNames = [
1116
- "Waiting for a session to become available" ,
1117
- "No session available" ,
1118
- ]
1119
- self .assertSpanEvents ("pool.Get" , wantEventNames )
1120
-
1121
-
1122
898
class TestSessionCheckout (unittest .TestCase ):
1123
899
def _getTargetClass (self ):
1124
900
from google .cloud .spanner_v1 .pool import SessionCheckout
0 commit comments