11
11
logging .basicConfig (level = logging .DEBUG )
12
12
13
13
14
- class TokenCacheTestCase (unittest .TestCase ):
14
+ # NOTE: These helpers were once implemented as static methods in TokenCacheTestCase.
15
+ # That would cause other test files' "from ... import TokenCacheTestCase"
16
+ # to re-run all test cases in this file.
17
+ # Now we avoid that, by defining these helpers in module level.
18
+ def build_id_token (
19
+ iss = "issuer" , sub = "subject" , aud = "my_client_id" , exp = None , iat = None ,
20
+ ** claims ): # AAD issues "preferred_username", ADFS issues "upn"
21
+ return "header.%s.signature" % base64 .b64encode (json .dumps (dict ({
22
+ "iss" : iss ,
23
+ "sub" : sub ,
24
+ "aud" : aud ,
25
+ "exp" : exp or (time .time () + 100 ),
26
+ "iat" : iat or time .time (),
27
+ }, ** claims )).encode ()).decode ('utf-8' )
28
+
15
29
16
- @staticmethod
17
- def build_id_token (
18
- iss = "issuer" , sub = "subject" , aud = "my_client_id" , exp = None , iat = None ,
19
- ** claims ): # AAD issues "preferred_username", ADFS issues "upn"
20
- return "header.%s.signature" % base64 .b64encode (json .dumps (dict ({
21
- "iss" : iss ,
22
- "sub" : sub ,
23
- "aud" : aud ,
24
- "exp" : exp or (time .time () + 100 ),
25
- "iat" : iat or time .time (),
26
- }, ** claims )).encode ()).decode ('utf-8' )
30
+ def build_response ( # simulate a response from AAD
31
+ uid = None , utid = None , # If present, they will form client_info
32
+ access_token = None , expires_in = 3600 , token_type = "some type" ,
33
+ ** kwargs # Pass-through: refresh_token, foci, id_token, error, refresh_in, ...
34
+ ):
35
+ response = {}
36
+ if uid and utid : # Mimic the AAD behavior for "client_info=1" request
37
+ response ["client_info" ] = base64 .b64encode (json .dumps ({
38
+ "uid" : uid , "utid" : utid ,
39
+ }).encode ()).decode ('utf-8' )
40
+ if access_token :
41
+ response .update ({
42
+ "access_token" : access_token ,
43
+ "expires_in" : expires_in ,
44
+ "token_type" : token_type ,
45
+ })
46
+ response .update (kwargs ) # Pass-through key-value pairs as top-level fields
47
+ return response
27
48
28
- @staticmethod
29
- def build_response ( # simulate a response from AAD
30
- uid = None , utid = None , # If present, they will form client_info
31
- access_token = None , expires_in = 3600 , token_type = "some type" ,
32
- ** kwargs # Pass-through: refresh_token, foci, id_token, error, refresh_in, ...
33
- ):
34
- response = {}
35
- if uid and utid : # Mimic the AAD behavior for "client_info=1" request
36
- response ["client_info" ] = base64 .b64encode (json .dumps ({
37
- "uid" : uid , "utid" : utid ,
38
- }).encode ()).decode ('utf-8' )
39
- if access_token :
40
- response .update ({
41
- "access_token" : access_token ,
42
- "expires_in" : expires_in ,
43
- "token_type" : token_type ,
44
- })
45
- response .update (kwargs ) # Pass-through key-value pairs as top-level fields
46
- return response
49
+
50
+ class TokenCacheTestCase (unittest .TestCase ):
47
51
48
52
def setUp (self ):
49
53
self .cache = TokenCache ()
50
54
51
55
def testAddByAad (self ):
52
56
client_id = "my_client_id"
53
- id_token = self . build_id_token (
57
+ id_token = build_id_token (
54
58
oid = "object1234" , preferred_username = "John Doe" , aud = client_id )
55
59
self .cache .add ({
56
60
"client_id" : client_id ,
57
61
"scope" : ["s2" , "s1" , "s3" ], # Not in particular order
58
62
"token_endpoint" : "https://login.example.com/contoso/v2/token" ,
59
- "response" : self . build_response (
63
+ "response" : build_response (
60
64
uid = "uid" , utid = "utid" , # client_info
61
65
expires_in = 3600 , access_token = "an access token" ,
62
66
id_token = id_token , refresh_token = "a refresh token" ),
@@ -125,12 +129,12 @@ def testAddByAad(self):
125
129
126
130
def testAddByAdfs (self ):
127
131
client_id = "my_client_id"
128
- id_token = self . build_id_token (
aud = client_id ,
upn = "[email protected] " )
132
+ id_token = build_id_token (
aud = client_id ,
upn = "[email protected] " )
129
133
self .cache .add ({
130
134
"client_id" : client_id ,
131
135
"scope" : ["s2" , "s1" , "s3" ], # Not in particular order
132
136
"token_endpoint" : "https://fs.msidlab8.com/adfs/oauth2/token" ,
133
- "response" : self . build_response (
137
+ "response" : build_response (
134
138
uid = None , utid = None , # ADFS will provide no client_info
135
139
expires_in = 3600 , access_token = "an access token" ,
136
140
id_token = id_token , refresh_token = "a refresh token" ),
@@ -204,7 +208,7 @@ def test_key_id_is_also_recorded(self):
204
208
"client_id" : "my_client_id" ,
205
209
"scope" : ["s2" , "s1" , "s3" ], # Not in particular order
206
210
"token_endpoint" : "https://login.example.com/contoso/v2/token" ,
207
- "response" : self . build_response (
211
+ "response" : build_response (
208
212
uid = "uid" , utid = "utid" , # client_info
209
213
expires_in = 3600 , access_token = "an access token" ,
210
214
refresh_token = "a refresh token" ),
@@ -219,7 +223,7 @@ def test_refresh_in_should_be_recorded_as_refresh_on(self): # Sounds weird. Yep
219
223
"client_id" : "my_client_id" ,
220
224
"scope" : ["s2" , "s1" , "s3" ], # Not in particular order
221
225
"token_endpoint" : "https://login.example.com/contoso/v2/token" ,
222
- "response" : self . build_response (
226
+ "response" : build_response (
223
227
uid = "uid" , utid = "utid" , # client_info
224
228
expires_in = 3600 , refresh_in = 1800 , access_token = "an access token" ,
225
229
), #refresh_token="a refresh token"),
0 commit comments