3
3
import json
4
4
import time
5
5
import unittest
6
+ import sys
6
7
7
8
import requests
8
9
11
12
from msal .oauth2cli import AuthCodeReceiver
12
13
13
14
logger = logging .getLogger (__name__ )
14
- logging .basicConfig (level = logging .INFO )
15
+ logging .basicConfig (level = logging .DEBUG if "-v" in sys . argv else logging . INFO )
15
16
16
17
17
18
def _get_app_and_auth_code (
@@ -49,7 +50,8 @@ def assertLoosely(self, response, assertion=None,
49
50
error_description = response .get ("error_description" )))
50
51
assertion ()
51
52
52
- def assertCacheWorksForUser (self , result_from_wire , scope , username = None ):
53
+ def assertCacheWorksForUser (
54
+ self , result_from_wire , scope , username = None , data = None ):
53
55
# You can filter by predefined username, or let end user to choose one
54
56
accounts = self .app .get_accounts (username = username )
55
57
self .assertNotEqual (0 , len (accounts ))
@@ -59,7 +61,8 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None):
59
61
set (scope ) <= set (result_from_wire ["scope" ].split (" " ))
60
62
):
61
63
# Going to test acquire_token_silent(...) to locate an AT from cache
62
- result_from_cache = self .app .acquire_token_silent (scope , account = account )
64
+ result_from_cache = self .app .acquire_token_silent (
65
+ scope , account = account , data = data or {})
63
66
self .assertIsNotNone (result_from_cache )
64
67
self .assertIsNone (
65
68
result_from_cache .get ("refresh_token" ), "A cache hit returns no RT" )
@@ -69,7 +72,8 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None):
69
72
70
73
# Going to test acquire_token_silent(...) to obtain an AT by a RT from cache
71
74
self .app .token_cache ._cache ["AccessToken" ] = {} # A hacky way to clear ATs
72
- result_from_cache = self .app .acquire_token_silent (scope , account = account )
75
+ result_from_cache = self .app .acquire_token_silent (
76
+ scope , account = account , data = data or {})
73
77
self .assertIsNotNone (result_from_cache ,
74
78
"We should get a result from acquire_token_silent(...) call" )
75
79
self .assertIsNotNone (
@@ -131,6 +135,84 @@ def _test_device_flow(
131
135
logger .info (
132
136
"%s obtained tokens: %s" , self .id (), json .dumps (result , indent = 4 ))
133
137
138
+ def _test_acquire_token_interactive (
139
+ self , client_id = None , authority = None , scope = None , port = None ,
140
+ username_uri = "" , # But you would want to provide one
141
+ data = None , # Needed by ssh-cert feature
142
+ ** ignored ):
143
+ assert client_id and authority and scope
144
+ self .app = msal .PublicClientApplication (
145
+ client_id , authority = authority , http_client = MinimalHttpClient ())
146
+ result = self .app .acquire_token_interactive (
147
+ scope ,
148
+ timeout = 120 ,
149
+ port = port ,
150
+ welcome_template = # This is an undocumented feature for testing
151
+ """<html><body><h1>{id}</h1><ol>
152
+ <li>Get a username from the upn shown at <a href="{username_uri}">here</a></li>
153
+ <li>Get its password from https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
154
+ (replace the lab name with the labName from the link above).</li>
155
+ <li><a href="$auth_uri">Sign In</a> or <a href="$abort_uri">Abort</a></li>
156
+ </ol></body></html>""" .format (id = self .id (), username_uri = username_uri ),
157
+ data = data or {},
158
+ )
159
+ logger .debug (
160
+ "%s: cache = %s, id_token_claims = %s" ,
161
+ self .id (),
162
+ json .dumps (self .app .token_cache ._cache , indent = 4 ),
163
+ json .dumps (result .get ("id_token_claims" ), indent = 4 ),
164
+ )
165
+ self .assertIn (
166
+ "access_token" , result ,
167
+ "{error}: {error_description}" .format (
168
+ # Note: No interpolation here, cause error won't always present
169
+ error = result .get ("error" ),
170
+ error_description = result .get ("error_description" )))
171
+ self .assertCacheWorksForUser (result , scope , username = None , data = data or {})
172
+ return result # For further testing
173
+
174
+
175
+ class SshCertTestCase (E2eTestCase ):
176
+ _JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
177
+ _JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
178
+ DATA1 = {"token_type" : "ssh-cert" , "key_id" : "key1" , "req_cnf" : _JWK1 }
179
+ DATA2 = {"token_type" : "ssh-cert" , "key_id" : "key2" , "req_cnf" : _JWK2 }
180
+ _SCOPE_USER = ["https://pas.windows.net/CheckMyAccess/Linux/user_impersonation" ]
181
+ _SCOPE_SP = ["https://pas.windows.net/CheckMyAccess/Linux/.default" ]
182
+ SCOPE = _SCOPE_SP # Historically there was a separation, at 2021 it is unified
183
+
184
+ def test_ssh_cert_for_service_principal (self ):
185
+ # Any SP can obtain an ssh-cert. Here we use the lab app.
186
+ result = get_lab_app ().acquire_token_for_client (self .SCOPE , data = self .DATA1 )
187
+ self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
188
+ result .get ("error" ), result .get ("error_description" )))
189
+ self .assertEqual ("ssh-cert" , result ["token_type" ])
190
+
191
+ @unittest .skipIf (os .getenv ("TRAVIS" ), "Browser automation is not yet implemented" )
192
+ def test_ssh_cert_for_user (self ):
193
+ result = self ._test_acquire_token_interactive (
194
+ client_id = "04b07795-8ddb-461a-bbee-02f9e1bf7b46" , # Azure CLI is one
195
+ # of the only 2 clients that are PreAuthz to use ssh cert feature
196
+ authority = "https://login.microsoftonline.com/common" ,
197
+ scope = self .SCOPE ,
198
+ data = self .DATA1 ,
199
+ username_uri = "https://msidlab.com/api/user?usertype=cloud" ,
200
+ ) # It already tests reading AT from cache, and using RT to refresh
201
+ # acquire_token_silent() would work because we pass in the same key
202
+ self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
203
+ result .get ("error" ), result .get ("error_description" )))
204
+ self .assertEqual ("ssh-cert" , result ["token_type" ])
205
+ logger .debug ("%s.cache = %s" ,
206
+ self .id (), json .dumps (self .app .token_cache ._cache , indent = 4 ))
207
+
208
+ # refresh_token grant can fetch an ssh-cert bound to a different key
209
+ account = self .app .get_accounts ()[0 ]
210
+ refreshed_ssh_cert = self .app .acquire_token_silent (
211
+ self .SCOPE , account = account , data = self .DATA2 )
212
+ self .assertIsNotNone (refreshed_ssh_cert )
213
+ self .assertEqual (refreshed_ssh_cert ["token_type" ], "ssh-cert" )
214
+ self .assertNotEqual (result ["access_token" ], refreshed_ssh_cert ['access_token' ])
215
+
134
216
135
217
THIS_FOLDER = os .path .dirname (__file__ )
136
218
CONFIG = os .path .join (THIS_FOLDER , "config.json" )
@@ -190,48 +272,6 @@ def test_auth_code_with_mismatching_nonce(self):
190
272
self .app .acquire_token_by_authorization_code (
191
273
ac , self .config ["scope" ], redirect_uri = redirect_uri , nonce = "bar" )
192
274
193
- def test_ssh_cert (self ):
194
- self .skipUnlessWithConfig (["client_id" , "scope" ])
195
-
196
- JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
197
- JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
198
- data1 = {"token_type" : "ssh-cert" , "key_id" : "key1" , "req_cnf" : JWK1 }
199
- ssh_test_slice = {
200
- "dc" : "prod-wst-test1" ,
201
- "slice" : "test" ,
202
- "sshcrt" : "true" ,
203
- }
204
-
205
- scopes = [ # Only this scope would result in an SSH-Cert
206
- "https://pas.windows.net/CheckMyAccess/Linux/user_impersonation" ]
207
- (self .app , ac , redirect_uri ) = self ._get_app_and_auth_code (scopes = scopes )
208
-
209
- result = self .app .acquire_token_by_authorization_code (
210
- ac , scopes , redirect_uri = redirect_uri , data = data1 ,
211
- params = ssh_test_slice )
212
- self .assertIsNotNone (result .get ("access_token" ), "Encountered {}: {}" .format (
213
- result .get ("error" ), result .get ("error_description" )))
214
- self .assertEqual ("ssh-cert" , result ["token_type" ])
215
- logger .debug ("%s.cache = %s" ,
216
- self .id (), json .dumps (self .app .token_cache ._cache , indent = 4 ))
217
-
218
- # acquire_token_silent() needs to be passed the same key to work
219
- account = self .app .get_accounts ()[0 ]
220
- result_from_cache = self .app .acquire_token_silent (
221
- scopes , account = account , data = data1 )
222
- self .assertIsNotNone (result_from_cache )
223
- self .assertEqual (
224
- result ['access_token' ], result_from_cache ['access_token' ],
225
- "We should get the cached SSH-cert" )
226
-
227
- # refresh_token grant can fetch an ssh-cert bound to a different key
228
- refreshed_ssh_cert = self .app .acquire_token_silent (
229
- scopes , account = account , params = ssh_test_slice ,
230
- data = {"token_type" : "ssh-cert" , "key_id" : "key2" , "req_cnf" : JWK2 })
231
- self .assertIsNotNone (refreshed_ssh_cert )
232
- self .assertEqual (refreshed_ssh_cert ["token_type" ], "ssh-cert" )
233
- self .assertNotEqual (result ["access_token" ], refreshed_ssh_cert ['access_token' ])
234
-
235
275
def test_client_secret (self ):
236
276
self .skipUnlessWithConfig (["client_id" , "client_secret" ])
237
277
self .app = msal .ConfidentialClientApplication (
@@ -445,39 +485,6 @@ def _test_acquire_token_by_auth_code_flow(
445
485
error_description = result .get ("error_description" )))
446
486
self .assertCacheWorksForUser (result , scope , username = None )
447
487
448
- def _test_acquire_token_interactive (
449
- self , client_id = None , authority = None , scope = None , port = None ,
450
- username_uri = "" , # But you would want to provide one
451
- ** ignored ):
452
- assert client_id and authority and scope
453
- self .app = msal .PublicClientApplication (
454
- client_id , authority = authority , http_client = MinimalHttpClient ())
455
- result = self .app .acquire_token_interactive (
456
- scope ,
457
- timeout = 60 ,
458
- port = port ,
459
- welcome_template = # This is an undocumented feature for testing
460
- """<html><body><h1>{id}</h1><ol>
461
- <li>Get a username from the upn shown at <a href="{username_uri}">here</a></li>
462
- <li>Get its password from https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
463
- (replace the lab name with the labName from the link above).</li>
464
- <li><a href="$auth_uri">Sign In</a> or <a href="$abort_uri">Abort</a></li>
465
- </ol></body></html>""" .format (id = self .id (), username_uri = username_uri ),
466
- )
467
- logger .debug (
468
- "%s: cache = %s, id_token_claims = %s" ,
469
- self .id (),
470
- json .dumps (self .app .token_cache ._cache , indent = 4 ),
471
- json .dumps (result .get ("id_token_claims" ), indent = 4 ),
472
- )
473
- self .assertIn (
474
- "access_token" , result ,
475
- "{error}: {error_description}" .format (
476
- # Note: No interpolation here, cause error won't always present
477
- error = result .get ("error" ),
478
- error_description = result .get ("error_description" )))
479
- self .assertCacheWorksForUser (result , scope , username = None )
480
-
481
488
def _test_acquire_token_obo (self , config_pca , config_cca ):
482
489
# 1. An app obtains a token representing a user, for our mid-tier service
483
490
pca = msal .PublicClientApplication (
0 commit comments