@@ -50,7 +50,8 @@ def assertLoosely(self, response, assertion=None,
50
50
error_description = response .get ("error_description" )))
51
51
assertion ()
52
52
53
- def assertCacheWorksForUser (self , result_from_wire , scope , username = None ):
53
+ def assertCacheWorksForUser (
54
+ self , result_from_wire , scope , username = None , data = None ):
54
55
# You can filter by predefined username, or let end user to choose one
55
56
accounts = self .app .get_accounts (username = username )
56
57
self .assertNotEqual (0 , len (accounts ))
@@ -60,7 +61,8 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None):
60
61
set (scope ) <= set (result_from_wire ["scope" ].split (" " ))
61
62
):
62
63
# Going to test acquire_token_silent(...) to locate an AT from cache
63
- result_from_cache = self .app .acquire_token_silent (scope , account = account )
64
+ result_from_cache = self .app .acquire_token_silent (
65
+ scope , account = account , data = data or {})
64
66
self .assertIsNotNone (result_from_cache )
65
67
self .assertIsNone (
66
68
result_from_cache .get ("refresh_token" ), "A cache hit returns no RT" )
@@ -70,7 +72,8 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None):
70
72
71
73
# Going to test acquire_token_silent(...) to obtain an AT by a RT from cache
72
74
self .app .token_cache ._cache ["AccessToken" ] = {} # A hacky way to clear ATs
73
- result_from_cache = self .app .acquire_token_silent (scope , account = account )
75
+ result_from_cache = self .app .acquire_token_silent (
76
+ scope , account = account , data = data or {})
74
77
self .assertIsNotNone (result_from_cache ,
75
78
"We should get a result from acquire_token_silent(...) call" )
76
79
self .assertIsNotNone (
@@ -132,6 +135,84 @@ def _test_device_flow(
132
135
logger .info (
133
136
"%s obtained tokens: %s" , self .id (), json .dumps (result , indent = 4 ))
134
137
138
+ def _test_acquire_token_interactive (
139
+ self , client_id = None , authority = None , scope = None , port = None ,
140
+ username_uri = "" , # But you would want to provide one
141
+ data = None , # Needed by ssh-cert feature
142
+ ** ignored ):
143
+ assert client_id and authority and scope
144
+ self .app = msal .PublicClientApplication (
145
+ client_id , authority = authority , http_client = MinimalHttpClient ())
146
+ result = self .app .acquire_token_interactive (
147
+ scope ,
148
+ timeout = 120 ,
149
+ port = port ,
150
+ welcome_template = # This is an undocumented feature for testing
151
+ """<html><body><h1>{id}</h1><ol>
152
+ <li>Get a username from the upn shown at <a href="{username_uri}">here</a></li>
153
+ <li>Get its password from https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
154
+ (replace the lab name with the labName from the link above).</li>
155
+ <li><a href="$auth_uri">Sign In</a> or <a href="$abort_uri">Abort</a></li>
156
+ </ol></body></html>""" .format (id = self .id (), username_uri = username_uri ),
157
+ data = data or {},
158
+ )
159
+ logger .debug (
160
+ "%s: cache = %s, id_token_claims = %s" ,
161
+ self .id (),
162
+ json .dumps (self .app .token_cache ._cache , indent = 4 ),
163
+ json .dumps (result .get ("id_token_claims" ), indent = 4 ),
164
+ )
165
+ self .assertIn (
166
+ "access_token" , result ,
167
+ "{error}: {error_description}" .format (
168
+ # Note: No interpolation here, cause error won't always present
169
+ error = result .get ("error" ),
170
+ error_description = result .get ("error_description" )))
171
+ self .assertCacheWorksForUser (result , scope , username = None , data = data or {})
172
+ return result # For further testing
173
+
174
+
175
+ class SshCertTestCase (E2eTestCase ):
176
+ _JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
177
+ _JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
178
+ DATA1 = {"token_type" : "ssh-cert" , "key_id" : "key1" , "req_cnf" : _JWK1 }
179
+ DATA2 = {"token_type" : "ssh-cert" , "key_id" : "key2" , "req_cnf" : _JWK2 }
180
+ _SCOPE_USER = ["https://pas.windows.net/CheckMyAccess/Linux/user_impersonation" ]
181
+ _SCOPE_SP = ["https://pas.windows.net/CheckMyAccess/Linux/.default" ]
182
+ SCOPE = _SCOPE_SP # Historically there was a separation, at 2021 it is unified
183
+
184
+ def test_ssh_cert_for_service_principal (self ):
185
+ # Any SP can obtain an ssh-cert. Here we use the lab app.
186
+ result = get_lab_app ().acquire_token_for_client (self .SCOPE , data = self .DATA1 )
187
+ self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
188
+ result .get ("error" ), result .get ("error_description" )))
189
+ self .assertEqual ("ssh-cert" , result ["token_type" ])
190
+
191
+ @unittest .skipIf (os .getenv ("TRAVIS" ), "Browser automation is not yet implemented" )
192
+ def test_ssh_cert_for_user (self ):
193
+ result = self ._test_acquire_token_interactive (
194
+ client_id = "04b07795-8ddb-461a-bbee-02f9e1bf7b46" , # Azure CLI is one
195
+ # of the only 2 clients that are PreAuthz to use ssh cert feature
196
+ authority = "https://login.microsoftonline.com/common" ,
197
+ scope = self .SCOPE ,
198
+ data = self .DATA1 ,
199
+ username_uri = "https://msidlab.com/api/user?usertype=cloud" ,
200
+ ) # It already tests reading AT from cache, and using RT to refresh
201
+ # acquire_token_silent() would work because we pass in the same key
202
+ self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
203
+ result .get ("error" ), result .get ("error_description" )))
204
+ self .assertEqual ("ssh-cert" , result ["token_type" ])
205
+ logger .debug ("%s.cache = %s" ,
206
+ self .id (), json .dumps (self .app .token_cache ._cache , indent = 4 ))
207
+
208
+ # refresh_token grant can fetch an ssh-cert bound to a different key
209
+ account = self .app .get_accounts ()[0 ]
210
+ refreshed_ssh_cert = self .app .acquire_token_silent (
211
+ self .SCOPE , account = account , data = self .DATA2 )
212
+ self .assertIsNotNone (refreshed_ssh_cert )
213
+ self .assertEqual (refreshed_ssh_cert ["token_type" ], "ssh-cert" )
214
+ self .assertNotEqual (result ["access_token" ], refreshed_ssh_cert ['access_token' ])
215
+
135
216
136
217
THIS_FOLDER = os .path .dirname (__file__ )
137
218
CONFIG = os .path .join (THIS_FOLDER , "config.json" )
@@ -191,48 +272,6 @@ def test_auth_code_with_mismatching_nonce(self):
191
272
self .app .acquire_token_by_authorization_code (
192
273
ac , self .config ["scope" ], redirect_uri = redirect_uri , nonce = "bar" )
193
274
194
- def test_ssh_cert (self ):
195
- self .skipUnlessWithConfig (["client_id" , "scope" ])
196
-
197
- JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
198
- JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
199
- data1 = {"token_type" : "ssh-cert" , "key_id" : "key1" , "req_cnf" : JWK1 }
200
- ssh_test_slice = {
201
- "dc" : "prod-wst-test1" ,
202
- "slice" : "test" ,
203
- "sshcrt" : "true" ,
204
- }
205
-
206
- scopes = [ # Only this scope would result in an SSH-Cert
207
- "https://pas.windows.net/CheckMyAccess/Linux/user_impersonation" ]
208
- (self .app , ac , redirect_uri ) = self ._get_app_and_auth_code (scopes = scopes )
209
-
210
- result = self .app .acquire_token_by_authorization_code (
211
- ac , scopes , redirect_uri = redirect_uri , data = data1 ,
212
- params = ssh_test_slice )
213
- self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
214
- result .get ("error" ), result .get ("error_description" )))
215
- self .assertEqual ("ssh-cert" , result ["token_type" ])
216
- logger .debug ("%s.cache = %s" ,
217
- self .id (), json .dumps (self .app .token_cache ._cache , indent = 4 ))
218
-
219
- # acquire_token_silent() needs to be passed the same key to work
220
- account = self .app .get_accounts ()[0 ]
221
- result_from_cache = self .app .acquire_token_silent (
222
- scopes , account = account , data = data1 )
223
- self .assertIsNotNone (result_from_cache )
224
- self .assertEqual (
225
- result ['access_token' ], result_from_cache ['access_token' ],
226
- "We should get the cached SSH-cert" )
227
-
228
- # refresh_token grant can fetch an ssh-cert bound to a different key
229
- refreshed_ssh_cert = self .app .acquire_token_silent (
230
- scopes , account = account , params = ssh_test_slice ,
231
- data = {"token_type" : "ssh-cert" , "key_id" : "key2" , "req_cnf" : JWK2 })
232
- self .assertIsNotNone (refreshed_ssh_cert )
233
- self .assertEqual (refreshed_ssh_cert ["token_type" ], "ssh-cert" )
234
- self .assertNotEqual (result ["access_token" ], refreshed_ssh_cert ['access_token' ])
235
-
236
275
def test_client_secret (self ):
237
276
self .skipUnlessWithConfig (["client_id" , "client_secret" ])
238
277
self .app = msal .ConfidentialClientApplication (
@@ -446,39 +485,6 @@ def _test_acquire_token_by_auth_code_flow(
446
485
error_description = result .get ("error_description" )))
447
486
self .assertCacheWorksForUser (result , scope , username = None )
448
487
449
- def _test_acquire_token_interactive (
450
- self , client_id = None , authority = None , scope = None , port = None ,
451
- username_uri = "" , # But you would want to provide one
452
- ** ignored ):
453
- assert client_id and authority and scope
454
- self .app = msal .PublicClientApplication (
455
- client_id , authority = authority , http_client = MinimalHttpClient ())
456
- result = self .app .acquire_token_interactive (
457
- scope ,
458
- timeout = 60 ,
459
- port = port ,
460
- welcome_template = # This is an undocumented feature for testing
461
- """<html><body><h1>{id}</h1><ol>
462
- <li>Get a username from the upn shown at <a href="{username_uri}">here</a></li>
463
- <li>Get its password from https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
464
- (replace the lab name with the labName from the link above).</li>
465
- <li><a href="$auth_uri">Sign In</a> or <a href="$abort_uri">Abort</a></li>
466
- </ol></body></html>""" .format (id = self .id (), username_uri = username_uri ),
467
- )
468
- logger .debug (
469
- "%s: cache = %s, id_token_claims = %s" ,
470
- self .id (),
471
- json .dumps (self .app .token_cache ._cache , indent = 4 ),
472
- json .dumps (result .get ("id_token_claims" ), indent = 4 ),
473
- )
474
- self .assertIn (
475
- "access_token" , result ,
476
- "{error}: {error_description}" .format (
477
- # Note: No interpolation here, cause error won't always present
478
- error = result .get ("error" ),
479
- error_description = result .get ("error_description" )))
480
- self .assertCacheWorksForUser (result , scope , username = None )
481
-
482
488
def _test_acquire_token_obo (self , config_pca , config_cca ):
483
489
# 1. An app obtains a token representing a user, for our mid-tier service
484
490
pca = msal .PublicClientApplication (
0 commit comments